Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions collectd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

logger = logging.getLogger("collectd")

SEND_INTERVAL = 10 # seconds
MAX_PACKET_SIZE = 1024 # bytes

PLUGIN_TYPE = "gauge"
Expand All @@ -37,6 +36,14 @@
VALUE_GAUGE = 1
VALUE_DERIVE = 2
VALUE_ABSOLUTE = 3

VALUE_LOOKUP = {
'counter': VALUE_COUNTER,
'gauge': VALUE_GAUGE,
'derive': VALUE_DERIVE,
'absolute': VALUE_ABSOLUTE,
}

VALUE_CODES = {
VALUE_COUNTER: "!Q",
VALUE_GAUGE: "<d",
Expand All @@ -52,10 +59,14 @@ def pack_string(type_code, string):
return struct.pack("!HH", type_code, 5 + len(string)) + string + "\0"

def pack_value(name, value):
value_type = VALUE_LOOKUP[PLUGIN_TYPE]
value_fmt = VALUE_CODES[value_type]

return "".join([
pack(TYPE_TYPE_INSTANCE, name),
struct.pack("!HHH", TYPE_VALUES, 15, 1),
struct.pack("<Bd", VALUE_GAUGE, value)
struct.pack("<B", value_type),
struct.pack(value_fmt, value)
])

def pack(id, value):
Expand All @@ -68,19 +79,19 @@ def pack(id, value):
else:
raise AssertionError("invalid type code " + str(id))

def message_start(when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any"):
def message_start(when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any", send_interval=10):
return "".join([
pack(TYPE_HOST, host),
pack(TYPE_TIME, when or time.time()),
pack(TYPE_PLUGIN, plugin_name),
pack(TYPE_PLUGIN_INSTANCE, plugin_inst),
pack(TYPE_TYPE, PLUGIN_TYPE),
pack(TYPE_INTERVAL, SEND_INTERVAL)
pack(TYPE_INTERVAL, send_interval)
])

def messages(counts, when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any"):
def messages(counts, when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any", send_interval=10):
packets = []
start = message_start(when, host, plugin_inst, plugin_name)
start = message_start(when, host, plugin_inst, plugin_name, send_interval)
parts = [pack(name, count) for name,count in counts.items()]
parts = [p for p in parts if len(start) + len(p) <= MAX_PACKET_SIZE]
if parts:
Expand Down Expand Up @@ -158,7 +169,7 @@ class Connection(object):
@synchronized
def __new__(cls, hostname = socket.gethostname(),
collectd_host = "localhost", collectd_port = 25826,
plugin_inst = "", plugin_name = "any"):
plugin_inst = "", plugin_name = "any", send_interval=10):
id = (hostname, collectd_host, collectd_port, plugin_inst, plugin_name)
if id in cls.instances:
return cls.instances[id]
Expand All @@ -169,14 +180,15 @@ def __new__(cls, hostname = socket.gethostname(),

def __init__(self, hostname = socket.gethostname(),
collectd_host = "localhost", collectd_port = 25826,
plugin_inst = "", plugin_name = "any"):
plugin_inst = "", plugin_name = "any", send_interval=10):
if "_counters" not in self.__dict__:
self._lock = RLock()
self._counters = {}
self._plugin_inst = plugin_inst
self._plugin_name = plugin_name
self._hostname = hostname
self._collectd_addr = (collectd_host, collectd_port)
self._send_interval = send_interval

@synchronized
def __getattr__(self, name):
Expand Down Expand Up @@ -208,7 +220,7 @@ def take_snapshots():
def send_stats(raise_on_empty = False):
try:
when, stats, conn = snaps.get(timeout = 0.1)
for message in messages(stats, when, conn._hostname, conn._plugin_inst, conn._plugin_name):
for message in messages(stats, when, conn._hostname, conn._plugin_inst, conn._plugin_name, conn._send_interval):
sock.sendto(message, conn._collectd_addr)
except Empty:
if raise_on_empty:
Expand All @@ -232,7 +244,7 @@ def wrapped():
t.start()

single_start = Semaphore()
def start_threads():
def start_threads(send_interval=10):
assert single_start.acquire(blocking = False)
daemonize(take_snapshots, sleep_for = SEND_INTERVAL)
daemonize(take_snapshots, sleep_for = send_interval)
daemonize(send_stats)