|
7 | 7 | import logging |
8 | 8 | log = logging.getLogger(__name__) |
9 | 9 |
|
10 | | -receivers = {} |
11 | | -summarizers = [] |
12 | | - |
13 | | - |
14 | | -def receives(protocol_type): |
15 | | - def decorate(fn): |
16 | | - receivers[protocol_type.name] = fn |
17 | | - return fn |
18 | | - |
19 | | - return decorate |
20 | | - |
21 | | - |
22 | | -def summarizes(protocol_type): |
23 | | - def decorate(fn): |
24 | | - summarizers.append(fn) |
25 | | - return fn |
26 | | - |
27 | | - return decorate |
28 | | - |
29 | 10 |
|
30 | 11 | class Receiver(object): |
31 | | - def __init__(self): |
32 | | - self.message_receiver = protocol.MessageReceiver( |
| 12 | + def __init__(self, plugin="sqlalchemy"): |
| 13 | + self.plugin = plugin |
| 14 | + self.types = types_ = [ |
33 | 15 | types.pool, |
34 | 16 | types.checkouts, |
35 | 17 | types.commits, |
36 | 18 | types.rollbacks, |
37 | 19 | types.invalidated, |
38 | 20 | types.transactions |
| 21 | + ] |
| 22 | + self.message_receiver = protocol.MessageReceiver(*types_) |
| 23 | + |
| 24 | + self.aggregator = aggregator.Aggregator( |
| 25 | + [type_.name for type_ in types_] |
39 | 26 | ) |
40 | 27 |
|
41 | | - def receive(self, connection, aggregator_): |
| 28 | + def receive(self, connection): |
42 | 29 | data, host = connection.receive() |
43 | 30 | message = self.message_receiver.receive(data) |
44 | | - type_ = message[protocol.TYPE_TYPE] |
| 31 | + type_name = message[protocol.TYPE_TYPE] |
45 | 32 | timestamp = message[protocol.TYPE_TIME] |
46 | 33 | host = message[protocol.TYPE_HOST] |
47 | 34 | progname = message[protocol.TYPE_PLUGIN_INSTANCE] |
48 | 35 | values = message[protocol.TYPE_VALUES] |
49 | 36 | pid = message[protocol.TYPE_TYPE_INSTANCE] |
50 | | - try: |
51 | | - receiver = receivers[type_] |
52 | | - except KeyError: |
53 | | - log.warn("Don't understand message type %s, skipping" % type_) |
54 | | - else: |
55 | | - receiver( |
56 | | - message, timestamp, host, progname, pid, values, aggregator_) |
57 | | - |
58 | | - def summarize(self, aggregator_, timestamp): |
59 | | - for summarizer in summarizers: |
60 | | - summarizer(aggregator_, timestamp) |
61 | | - |
62 | | - |
63 | | -@receives(types.pool) |
64 | | -def _receive_pool( |
65 | | - message, timestamp, host, progname, pid, values, aggregator_): |
66 | | - aggregator_.set_pool_stats(host, progname, pid, timestamp, *values) |
67 | | - |
68 | | - |
69 | | -@summarizes(types.pool) |
70 | | -def _summarize_pool(aggregator_, timestamp): |
71 | | - values = collectd.Values( |
72 | | - type=types.pool.name, |
73 | | - plugin="sqlalchemy", |
74 | | - time=timestamp, |
75 | | - interval=aggregator_.interval |
76 | | - ) |
77 | | - for hostname, progname, stats in \ |
78 | | - aggregator_.get_pool_stats_by_progname(timestamp, sum): |
79 | | - values.dispatch( |
80 | | - type_instance="sum", host=hostname, plugin_instance=progname, |
81 | | - values=stats |
82 | | - ) |
83 | | - |
84 | | - for hostname, stats in aggregator_.get_pool_stats_by_hostname( |
85 | | - timestamp, sum): |
86 | | - values.dispatch( |
87 | | - type_instance="sum", host=hostname, plugin_instance="all", |
88 | | - values=stats |
| 37 | + self.aggregator.set_stats( |
| 38 | + type_name, host, progname, pid, timestamp, *values |
89 | 39 | ) |
90 | 40 |
|
91 | | - for hostname, progname, stats in aggregator_.get_pool_stats_by_progname( |
92 | | - timestamp, aggregator.avg): |
93 | | - values.dispatch( |
94 | | - type_instance="avg", host=hostname, plugin_instance=progname, |
95 | | - values=stats |
96 | | - ) |
| 41 | + def summarize(self, timestamp): |
| 42 | + for type_ in self.types: |
| 43 | + self._summarize_for_type(type_, timestamp) |
97 | 44 |
|
98 | | - for hostname, stats in aggregator_.get_pool_stats_by_hostname( |
99 | | - timestamp, aggregator.avg): |
100 | | - values.dispatch( |
101 | | - type_instance="avg", host=hostname, plugin_instance="all", |
102 | | - values=stats |
| 45 | + def _summarize_for_type(self, type_, timestamp): |
| 46 | + values = collectd.Values( |
| 47 | + type=type_.name, |
| 48 | + plugin=self.plugin, |
| 49 | + time=timestamp, |
| 50 | + interval=self.aggregator.interval |
103 | 51 | ) |
| 52 | + for hostname, progname, stats in \ |
| 53 | + self.aggregator.get_stats_by_progname( |
| 54 | + type_.name, timestamp, sum): |
| 55 | + values.dispatch( |
| 56 | + type_instance="sum", host=hostname, plugin_instance=progname, |
| 57 | + values=stats |
| 58 | + ) |
| 59 | + |
| 60 | + for hostname, stats in self.aggregator.get_stats_by_hostname( |
| 61 | + type_.name, timestamp, sum): |
| 62 | + values.dispatch( |
| 63 | + type_instance="sum", host=hostname, plugin_instance="all", |
| 64 | + values=stats |
| 65 | + ) |
| 66 | + |
| 67 | + for hostname, progname, stats in self.aggregator.get_stats_by_progname( |
| 68 | + type_.name, timestamp, aggregator.avg): |
| 69 | + values.dispatch( |
| 70 | + type_instance="avg", host=hostname, plugin_instance=progname, |
| 71 | + values=stats |
| 72 | + ) |
| 73 | + |
| 74 | + for hostname, stats in self.aggregator.get_stats_by_hostname( |
| 75 | + type_.name, timestamp, aggregator.avg): |
| 76 | + values.dispatch( |
| 77 | + type_instance="avg", host=hostname, plugin_instance="all", |
| 78 | + values=stats |
| 79 | + ) |
104 | 80 |
|
0 commit comments