1- import collections
21import struct
32import socket
43import logging
54import threading
6- import time
75
86log = logging .getLogger (__name__ )
97
108DEFAULT_INTERVAL = 10
119MAX_PACKET_SIZE = 1024
1210
13- VALUE_COUNTER = 0
14- VALUE_GAUGE = 1
15- VALUE_DERIVE = 2
11+ VALUE_COUNTER = 0
12+ VALUE_GAUGE = 1
13+ VALUE_DERIVE = 2
1614VALUE_ABSOLUTE = 3
1715
1816# https://git.octo.it/?p=collectd.git;a=blob;hb=master;f=src/network.h
2523TYPE_VALUES = 0x0006
2624TYPE_INTERVAL = 0x0007
2725
26+ _value_formats = {
27+ VALUE_COUNTER : "!Q" ,
28+ VALUE_GAUGE : "<d" ,
29+ VALUE_DERIVE : "!q" ,
30+ VALUE_ABSOLUTE : "!Q"
31+ }
32+
33+
34+ class Type (object ):
35+ """Represents a collectd type and its data template.
36+
37+ Here, we are encoding what we need to know about a collectd type that we'd
38+ be targeting to send to a collectd server.A type includes a name and a set
39+ of values and types that go along with it. These names and value
40+ definitions are a fixed thing on the collectd server side, which are
41+ listed in a file called types.db. Additional custom types can be added
42+ by specifying additional type files in the collectd configuration.
43+
44+ .. seealso::
45+
46+ collectd's default types.db:
47+ https://github.com/collectd/collectd/blob/master/src/types.db
48+
49+ """
50+
51+ __slots__ = 'name' , '_value_types' , '_value_formats' , '_message_template'
52+
53+ def __init__ (self , name , * db_template ):
54+ """Contruct a new Type.
55+
56+ E.g. to represent the "load" type in collectd's types.db::
57+
58+ load = Type(
59+ "load",
60+ ("shortterm", VALUE_GAUGE),
61+ ("midterm", VALUE_GAUGE),
62+ ("longterm", VALUE_GAUGE)
63+ )
64+ """
65+ self .name = name
66+ self ._value_types = [value_type for dsname , value_type in db_template ]
67+ self ._value_formats = [
68+ _value_formats [value_type ] for value_type in self ._value_types ]
69+ self ._message_template = struct .pack (
70+ "!HHH" , TYPE_VALUES , 6 + (9 * len (db_template )),
71+ len (db_template ))
72+ for value_type in self ._value_types :
73+ self ._message_template += struct .pack ("B" , value_type )
74+
75+ def encode_values (self , * values ):
76+ """Encode a series of values according to the type template."""
77+
78+ msg = self ._message_template
79+ for format_ , dsvalue in zip (self ._value_formats , values ):
80+ msg += struct .pack (format_ , dsvalue )
81+
82+ return msg
83+
2884
2985class MessageSender (object ):
86+ """Represents all the fields necessary to send a message."""
87+
88+ __slots__ = (
89+ 'type' , 'host' , 'plugin' , 'plugin_instance' , 'type_instance' ,
90+ 'interval' , '_host_message_part' , '_remainder_message_parts'
91+ )
92+
3093 def __init__ (
31- self , type , host , plugin = "sqlalchemy" , plugin_instance = None ,
94+ self , type , host , plugin , plugin_instance = None ,
3295 type_instance = None , interval = DEFAULT_INTERVAL ):
3396
34- # TODO: send template just like in types.db and fix to that
35-
3697 self .type = type
3798 self .host = host
3899 self .plugin = plugin
39100 self .plugin_instance = plugin_instance
40101 self .type_instance = type_instance
41102 self .interval = interval
42- self ._queue = collections .deque ()
43-
44- def _header (self , timestamp ):
45- buf = b''
46- buf += struct .pack ("!HH" , TYPE_HOST , 5 + len (self .host )) + self .host + b"\0 "
47- buf += struct .pack ("!HHq" , TYPE_TIME , 12 , timestamp )
48- buf += struct .pack ("!HH" , TYPE_PLUGIN , 5 + len (self .plugin )) + self .plugin + b"\0 "
49- buf += struct .pack ("!HH" , TYPE_PLUGIN_INSTANCE , 5 + len (self .plugin_instance )) + self .plugin_instance + b"\0 "
50- buf += struct .pack ("!HH" , TYPE_TYPE , 5 + len (self .type )) + self .type + b"\0 "
51- buf += struct .pack ("!HHq" , TYPE_INTERVAL , 12 , self .interval )
52- buf += struct .pack ("!HH" , TYPE_TYPE_INSTANCE , 5 + len (self .type_instance )) + self .type_instance + b'\0 '
53-
54- return buf
55-
56- def _gauge (self , dsname , dsvalue ):
57- buf = b''
58- buf += struct .pack ("!HHH" , TYPE_VALUES , 15 , 1 )
59- buf += struct .pack ("<Bd" , VALUE_GAUGE , dsvalue )
60-
61- return buf
62-
63- def _derive (self , dsname , dsvalue ):
64- buf = b''
65- buf += struct .pack ("!HHH" , TYPE_VALUES , 15 , 1 )
66- buf += struct .pack ("!Bq" , VALUE_DERIVE , dsvalue )
67-
68- return buf
69-
70- def queue_stat (self , * values ):
71- # TODO TODO
72- pass
73103
74- def queue_gauge (self , name , value ):
75- self ._queue .append ((VALUE_GAUGE , time .time (), name , value ))
104+ self ._host_message_part = self ._pack_string (TYPE_HOST , self .host )
105+ self ._remainder_message_parts = (
106+ self ._pack_string (TYPE_PLUGIN , self .plugin ) +
107+ self ._pack_string (TYPE_PLUGIN_INSTANCE , self .plugin_instance ) +
108+ self ._pack_string (TYPE_TYPE , self .type .name ) +
109+ struct .pack ("!HHq" , TYPE_INTERVAL , 12 , self .interval ) +
110+ self ._pack_string (TYPE_TYPE_INSTANCE , self .type_instance )
111+ )
76112
77- def queue_derive (self , name , value ):
78- self . _queue . append (( VALUE_DERIVE , time . time (), name , value ))
113+ def _pack_string (self , typecode , value ):
114+ return struct . pack ( "!HH" , typecode , 5 + len ( value )) + value + b" \0 "
79115
80- def flush (self , connection ):
81- now = time .time ()
82- too_old = now - self .interval
83- header = self ._header (now )
116+ def send (self , connection , timestamp , * values ):
117+ """Send a message on a connection."""
84118
85- while self ._queue :
86- type_ , timestamp , name , value = self ._queue .popleft ()
87- if timestamp < too_old :
88- continue
119+ header = self ._host_message_part + \
120+ struct .pack ("!HHq" , TYPE_TIME , 12 , timestamp ) + \
121+ self ._remainder_message_parts
89122
90- if type_ == VALUE_GAUGE :
91- element = self ._gauge (name , value )
92- else :
93- element = self ._derive (name , value )
123+ payload = self .type .encode_values (* values )
94124
95- connection .send (header + element )
125+ connection .send (header + payload )
96126
97127
98128class Connection (object ):
@@ -105,7 +135,7 @@ def __init__(self, host="localhost", port=25826):
105135 def send (self , message ):
106136 self ._mutex .acquire ()
107137 try :
108- log .debug ("sending: %s " , message )
138+ log .debug ("sending: %r " , message )
109139 self .socket .sendto (message , (self .host , self .port ))
110140 except IOError :
111141 log .error ("Error in socket.sendto" , exc_info = True )
0 commit comments