1+ import collectd
2+
13from .. import protocol
24from .. import types
5+ from . import aggregator
36
7+ import logging
8+ log = logging .getLogger (__name__ )
49
5- receivers = []
10+ receivers = {}
11+ summarizers = []
612
713
814def receives (protocol_type ):
915 def decorate (fn ):
10- receivers .append ((protocol_type , fn ))
16+ receivers [protocol_type .name ] = fn
17+ return fn
18+
19+ return decorate
20+
21+
22+ def summarizes (protocol_type ):
23+ def decorate (fn ):
24+ summarizers .append (fn )
1125 return fn
1226
1327 return decorate
@@ -24,11 +38,67 @@ def __init__(self):
2438 types .transactions
2539 )
2640
27- def receive (self , connection , aggregator ):
41+ def receive (self , connection , aggregator_ ):
2842 data , host = connection .receive ()
2943 message = self .message_receiver .receive (data )
30- if message is not None :
31- message ['host' ] = host
32- # TODO: look up type-specific handler
33- # feed to aggregator per-type
34- aggregator .put (message )
44+ type_ = message [protocol .TYPE_TYPE ]
45+ timestamp = message [protocol .TYPE_TIME ]
46+ host = message [protocol .TYPE_HOST ]
47+ progname = message [protocol .TYPE_PLUGIN_INSTANCE ]
48+ values = message [protocol .TYPE_VALUES ]
49+ pid = message [protocol .TYPE_TYPE_INSTANCE ]
50+ try :
51+ receiver = receivers [type_ ]
52+ except KeyError :
53+ log .warn ("Don't understand message type %s, skipping" % type_ )
54+ else :
55+ receiver (
56+ message , timestamp , host , progname , pid , values , aggregator_ )
57+
58+ def summarize (self , aggregator_ , timestamp ):
59+ for summarizer in summarizers :
60+ summarizer (aggregator_ , timestamp )
61+
62+
63+ @receives (types .pool )
64+ def _receive_pool (
65+ message , timestamp , host , progname , pid , values , aggregator_ ):
66+ aggregator_ .set_pool_stats (host , progname , pid , timestamp , * values )
67+
68+
69+ @summarizes (types .pool )
70+ def _summarize_pool (aggregator_ , timestamp ):
71+ values = collectd .Values (
72+ type = types .pool .name ,
73+ plugin = "sqlalchemy" ,
74+ time = timestamp ,
75+ interval = aggregator_ .interval
76+ )
77+ for hostname , progname , stats in \
78+ aggregator_ .get_pool_stats_by_progname (timestamp , sum ):
79+ values .dispatch (
80+ type_instance = "sum" , host = hostname , plugin_instance = progname ,
81+ values = stats
82+ )
83+
84+ for hostname , stats in aggregator_ .get_pool_stats_by_hostname (
85+ timestamp , sum ):
86+ values .dispatch (
87+ type_instance = "sum" , host = hostname , plugin_instance = "all" ,
88+ values = stats
89+ )
90+
91+ for hostname , progname , stats in aggregator_ .get_pool_stats_by_progname (
92+ timestamp , aggregator .avg ):
93+ values .dispatch (
94+ type_instance = "avg" , host = hostname , plugin_instance = progname ,
95+ values = stats
96+ )
97+
98+ for hostname , stats in aggregator_ .get_pool_stats_by_hostname (
99+ timestamp , aggregator .avg ):
100+ values .dispatch (
101+ type_instance = "avg" , host = hostname , plugin_instance = "all" ,
102+ values = stats
103+ )
104+
0 commit comments