Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit eface0a

Browse files
MarceloAquino7liyanhui1228
authored andcommitted
Stats core changes (#256)
1 parent 5331d74 commit eface0a

17 files changed

Lines changed: 863 additions & 5 deletions

examples/stats/helloworld/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#!/usr/bin/env python
2+
13
# Copyright 2018, OpenCensus Authors
24
#
35
# Licensed under the Apache License, Version 2.0 (the "License");

opencensus/common/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2018, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2018, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
# Copyright 2018, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import atexit
16+
import threading
17+
import time
18+
19+
from six.moves import queue
20+
from six.moves import range
21+
22+
from opencensus.common.transports import base
23+
24+
_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
25+
_DEFAULT_MAX_BATCH_SIZE = 10
26+
_WAIT_PERIOD = 1.0 # Seconds
27+
_WORKER_THREAD_NAME = 'opencensus.common.Worker'
28+
_WORKER_TERMINATOR = object()
29+
30+
31+
class _Worker(object):
32+
"""A background thread that exports batches of data.
33+
34+
:type exporter: :class:`~opencensus.trace.exporters.base.Exporter` or
35+
:class:`~opencensus.stats.exporters.base.StatsExporter`
36+
:param exporter: Instances of Exporter objects. Defaults to
37+
:class:`.PrintExporter`. The rest options are
38+
:class:`.ZipkinExporter`, :class:`.StackdriverExporter`,
39+
:class:`.LoggingExporter`, :class:`.FileExporter`.
40+
41+
:type grace_period: float
42+
:param grace_period: The amount of time to wait for pending data to
43+
be submitted when the process is shutting down.
44+
45+
:type max_batch_size: int
46+
:param max_batch_size: The maximum number of items to send at a time
47+
in the background thread.
48+
"""
49+
def __init__(self, exporter, grace_period=_DEFAULT_GRACE_PERIOD,
50+
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
51+
self.exporter = exporter
52+
self._grace_period = grace_period
53+
self._max_batch_size = max_batch_size
54+
self._queue = queue.Queue(0)
55+
self._lock = threading.Lock()
56+
self._thread = None
57+
58+
@property
59+
def is_alive(self):
60+
"""Returns True is the background thread is running."""
61+
return self._thread is not None and self._thread.is_alive()
62+
63+
def _get_items(self):
64+
"""Get multiple items from a Queue.
65+
66+
Gets at least one (blocking) and at most ``max_batch_size`` items
67+
(non-blocking) from a given Queue. Does not mark the items as done.
68+
69+
:rtype: Sequence
70+
:returns: A sequence of items retrieved from the queue.
71+
"""
72+
items = [self._queue.get()]
73+
74+
while len(items) < self._max_batch_size:
75+
try:
76+
items.append(self._queue.get_nowait())
77+
except queue.Empty:
78+
break
79+
80+
return items
81+
82+
def _thread_main(self):
83+
"""The entry point for the worker thread.
84+
85+
Pulls pending data off the queue and writes them in
86+
batches to the specified tracing backend using the exporter.
87+
"""
88+
quit_ = False
89+
90+
while True:
91+
items = self._get_items()
92+
data = []
93+
94+
for item in items:
95+
if item is _WORKER_TERMINATOR:
96+
quit_ = True
97+
# Continue processing items, don't break, try to process
98+
# all items we got back before quitting.
99+
else:
100+
data.extend(item)
101+
102+
if data:
103+
self.exporter.emit(data)
104+
105+
for _ in range(len(items)):
106+
self._queue.task_done()
107+
108+
# Wait for a while before next export
109+
time.sleep(_WAIT_PERIOD)
110+
111+
if quit_:
112+
break
113+
114+
def start(self):
115+
"""Starts the background thread.
116+
117+
Additionally, this registers a handler for process exit to attempt
118+
to send any pending data before shutdown.
119+
"""
120+
with self._lock:
121+
if self.is_alive:
122+
return
123+
124+
self._thread = threading.Thread(
125+
target=self._thread_main, name=_WORKER_THREAD_NAME)
126+
self._thread.daemon = True
127+
self._thread.start()
128+
atexit.register(self._export_pending_data)
129+
130+
def stop(self):
131+
"""Signals the background thread to stop.
132+
133+
This does not terminate the background thread. It simply queues the
134+
stop signal. If the main process exits before the background thread
135+
processes the stop signal, it will be terminated without finishing
136+
work. The ``grace_period`` parameter will give the background
137+
thread some time to finish processing before this function returns.
138+
139+
:rtype: bool
140+
:returns: True if the thread terminated. False if the thread is still
141+
running.
142+
"""
143+
if not self.is_alive:
144+
return True
145+
146+
with self._lock:
147+
self._queue.put_nowait(_WORKER_TERMINATOR)
148+
self._thread.join(timeout=self._grace_period)
149+
150+
success = not self.is_alive
151+
self._thread = None
152+
153+
return success
154+
155+
def _export_pending_data(self):
156+
"""Callback that attempts to send pending data before termination."""
157+
if not self.is_alive:
158+
return
159+
self.stop()
160+
161+
def enqueue(self, data):
162+
"""Queues data to be written by the background thread."""
163+
self._queue.put_nowait(data)
164+
165+
def flush(self):
166+
"""Submit any pending data."""
167+
self._queue.join()
168+
169+
170+
class AsyncTransport(base.Transport):
171+
"""Asynchronous transport that uses a background thread.
172+
173+
:type exporter: :class:`~opencensus.trace.exporters.base.Exporter` or
174+
:class:`~opencensus.stats.exporters.base.StatsExporter`
175+
:param exporter: Instances of Exporter objects. Defaults to
176+
:class:`.PrintExporter`. The rest options are
177+
:class:`.ZipkinExporter`, :class:`.StackdriverExporter`,
178+
:class:`.LoggingExporter`, :class:`.FileExporter`.
179+
180+
:type grace_period: float
181+
:param grace_period: The amount of time to wait for pending data to
182+
be submitted when the process is shutting down.
183+
184+
:type max_batch_size: int
185+
:param max_batch_size: The maximum number of items to send at a time
186+
in the background thread.
187+
"""
188+
189+
def __init__(self, exporter, grace_period=_DEFAULT_GRACE_PERIOD,
190+
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
191+
self.exporter = exporter
192+
self.worker = _Worker(exporter, grace_period, max_batch_size)
193+
self.worker.start()
194+
195+
def export(self, data):
196+
"""Put the trace/stats to be exported into queue."""
197+
self.worker.enqueue(data)
198+
199+
def flush(self):
200+
"""Submit any pending traces/stats."""
201+
self.worker.flush()
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2018, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Module containing base class for transport."""
16+
17+
18+
class Transport(object):
19+
"""Base class for transport.
20+
21+
Subclasses of :class:`Transport` must override :meth:`export`.
22+
"""
23+
def export(self, datas):
24+
"""Export the data."""
25+
raise NotImplementedError
26+
27+
def flush(self):
28+
"""Submit any pending data.
29+
30+
For blocking/sync transports, this is a no-op.
31+
"""
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2018, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from opencensus.common.transports import base
16+
17+
18+
class SyncTransport(base.Transport):
19+
def __init__(self, exporter):
20+
self.exporter = exporter
21+
22+
def export(self, datas):
23+
self.exporter.emit(datas)

opencensus/stats/aggregation.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,20 @@
1717

1818

1919
class Type(object):
20-
""" The type of aggreation function used on a View.
20+
""" The type of aggregation function used on a View.
2121
2222
Attributes:
23-
NONE (int): The aggreation type of the view is 'unknown'.
24-
SUM (int): The aggreation type of the view is 'sum'.
25-
COUNT (int): The aggreation type of the view is 'count'.
26-
DISTRIBUTION (int): The aggreation type of the view is 'distribution'.
23+
NONE (int): The aggregation type of the view is 'unknown'.
24+
SUM (int): The aggregation type of the view is 'sum'.
25+
COUNT (int): The aggregation type of the view is 'count'.
26+
DISTRIBUTION (int): The aggregation type of the view is 'distribution'.
27+
LASTVALUE (int): The aggregation type of the view is 'lastvalue'.
2728
"""
2829
NONE = 0
2930
SUM = 1
3031
COUNT = 2
3132
DISTRIBUTION = 3
33+
LASTVALUE = 4
3234

3335

3436
class BaseAggregation(object):
@@ -142,3 +144,28 @@ def boundaries(self):
142144
def distribution(self):
143145
"""The distribution of the current aggregation"""
144146
return self._distribution
147+
148+
149+
class LastValueAggregation(BaseAggregation):
150+
"""Describes that the data collected with this method will
151+
overwrite the last recorded value
152+
153+
:type value: long
154+
:param value: represents the value of this aggregation
155+
156+
:type aggregation_type: :class:`~opencensus.stats.aggregation.Type`
157+
:param aggregation_type: represents the type of this aggregation
158+
159+
"""
160+
def __init__(self, value=0, aggregation_type=Type.LASTVALUE):
161+
super(LastValueAggregation, self).__init__(
162+
aggregation_type=aggregation_type)
163+
self.aggregation_data = aggregation_data.LastValueAggregationData(
164+
value=value)
165+
self._value = value
166+
167+
@property
168+
def value(self):
169+
"""The current recorded value
170+
"""
171+
return self._value

opencensus/stats/aggregation_data.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,28 @@ def increment_bucket_count(self, value):
208208
i += 1
209209

210210
self._counts_per_bucket[(len(self._bounds))-1] += 1
211+
212+
213+
class LastValueAggregationData(BaseAggregationData):
214+
"""LastValue Aggregation Data is the value of aggregated data
215+
216+
:type value: long
217+
:param value: represents the current value
218+
219+
"""
220+
def __init__(self, value):
221+
super(LastValueAggregationData, self).__init__(value)
222+
self._value = value
223+
224+
def add_sample(self, value):
225+
"""Adds a sample to the current
226+
LastValue Aggregation Data and overwrite
227+
the current recorded value
228+
"""
229+
self._value = value
230+
231+
@property
232+
def value(self):
233+
"""The current value recorded
234+
"""
235+
return self._value

0 commit comments

Comments
 (0)