Skip to content

Commit bb158a0

Browse files
committed
Change pid to process_token
The "pid" value that is collected by the client plugin and passed to the aggregator in order to disambiguate separate processes is now augmented by a six character random hex string, so that a single host that may have the same pid repeated, such as when process namespaces or containers are used, sends correct statistics for the same program name configuration. Fixes: #9 Change-Id: I02081ef0e27703e27dc0bceb63566acaf608f777
1 parent fab45c8 commit bb158a0

5 files changed

Lines changed: 37 additions & 12 deletions

File tree

sqlalchemy_collectd/client/sender.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ def __init__(
3838
[protocol_type for protocol_type, sender in senders],
3939
)
4040

41-
def send(self, collection_target, timestamp, interval, pid):
41+
def send(self, collection_target, timestamp, interval, process_token):
4242
values = protocol.Values(
4343
host=self.hostname,
4444
plugin=self.plugin,
4545
plugin_instance=self.stats_name,
46-
type_instance=str(pid),
46+
type_instance=str(process_token),
4747
interval=interval,
4848
time=timestamp,
4949
)

sqlalchemy_collectd/client/tests/test_worker.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@ def start_loop():
7575

7676
self.assertEqual(
7777
[
78-
mock.call.info("Starting process thread in pid %s", mock.ANY),
78+
mock.call.info(
79+
"Starting process thread in pid %s, process token %s",
80+
mock.ANY,
81+
mock.ANY,
82+
),
7983
mock.call.error("error sending stats", exc_info=True),
8084
mock.call.info(
8185
"message sender thread caught SystemExit "

sqlalchemy_collectd/client/worker.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import threading
44
import time
5+
import uuid
56

67
log = logging.getLogger(__name__)
78

@@ -25,7 +26,12 @@ def _check_threads_started():
2526

2627
def _process(interval):
2728
pid = os.getpid()
28-
log.info("Starting process thread in pid %s", pid)
29+
process_token = "%s:%s" % (pid, str(uuid.uuid4())[0:6])
30+
log.info(
31+
"Starting process thread in pid %s, process token %s",
32+
pid,
33+
process_token,
34+
)
2935

3036
try:
3137
while True:
@@ -37,7 +43,9 @@ def _process(interval):
3743
if now - last_called[0] > interval:
3844
last_called[0] = now
3945
try:
40-
sender.send(collection_target, now, interval, pid)
46+
sender.send(
47+
collection_target, now, interval, process_token
48+
)
4149
except Exception:
4250
log.error("error sending stats", exc_info=True)
4351

sqlalchemy_collectd/server/receiver.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,26 @@ def _set_stats(self, values):
5757
timestamp = values.time
5858
hostname = values.host
5959
progname = values.plugin_instance
60-
pid = values.type_instance
60+
process_token = values.type_instance
6161
interval = values.interval
6262

6363
bucket = self.buckets[bucket_name]
6464
records = bucket.get_data(timestamp, interval=interval * 2)
6565

66-
records[(hostname, progname, pid)] = values
66+
records[(hostname, progname, process_token)] = values
6767

68-
if pid:
68+
if process_token:
6969
# manufacture a record for that is a single process count for this
70-
# pid. we also use a larger interval
70+
# process_token (which is roughly the pid plus a unique key
71+
# generated by the client plugin). we also use a larger interval
7172
# for this value so that the process count changes more slowly
7273
process_bucket = self.buckets[internal_types.process_internal.name]
7374
process_records = process_bucket.get_data(
7475
timestamp, interval=interval * 5
7576
)
76-
process_records[(hostname, progname, pid)] = values.build(
77+
process_records[
78+
(hostname, progname, process_token)
79+
] = values.build(
7780
type=internal_types.process_internal.name, values=[1]
7881
)
7982

@@ -86,8 +89,8 @@ def get_stats_by_progname(self, bucket_name, timestamp, agg_func=sum):
8689
):
8790
recs = [records[key] for key in keys]
8891
interval = recs[0].interval
89-
# summation here is across pids.
90-
# if records are pid-less, then there would be one record
92+
# summation here is across process_tokens.
93+
# if records are process_token-less, then there would be one record
9194
# per host/program name.
9295
values_obj = agg_func(recs).build(
9396
time=timestamp, interval=interval

unreleased_changes/9.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
.. change::
2+
:tags: bug
3+
:tickets: 9
4+
5+
The "pid" value that is collected by the client plugin and passed to the
6+
aggregator in order to disambiguate separate processes is now augmented
7+
by a six character random hex string, so that a single host that may have
8+
the same pid repeated, such as when process namespaces or containers are
9+
used, sends correct statistics for the same program name configuration.
10+

0 commit comments

Comments
 (0)