Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit 992b223

Browse files
authored
Introduce persistent storage to Azure exporter (#632)
Refactored and moved PeriodicTask to opencensus.common. Added local file storage to Azure exporter. Supported local persistence in Azure exporter.
1 parent b06d905 commit 992b223

10 files changed

Lines changed: 803 additions & 71 deletions

File tree

contrib/opencensus-ext-azure/opencensus/ext/azure/common/__init__.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,26 @@
1313
# limitations under the License.
1414

1515
import os
16+
import sys
1617

1718
from opencensus.ext.azure.common.protocol import Object
1819

1920

2021
class Options(Object):
2122
prototype = Object(
2223
endpoint='https://dc.services.visualstudio.com/v2/track',
24+
export_interval=15.0,
2325
instrumentation_key=os.getenv('APPINSIGHTS_INSTRUMENTATIONKEY', None),
26+
minimum_retry_interval=60, # minimum retry interval in seconds
2427
proxy=None,
25-
timeout=5.0, # timeout in seconds
28+
storage_maintenance_period=60,
29+
storage_max_size=100*1024*1024,
30+
storage_path=os.path.join(
31+
os.path.expanduser('~'),
32+
'.opencensus',
33+
'.azure',
34+
os.path.basename(sys.argv[0]) or '.console',
35+
),
36+
storage_retention_period=7*24*60*60,
37+
timeout=10.0, # networking timeout in seconds
2638
)
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import datetime
2+
import json
3+
import random
4+
import os
5+
6+
from opencensus.common.schedule import PeriodicTask
7+
8+
9+
def _fmt(timestamp):
10+
return timestamp.strftime('%Y-%m-%dT%H%M%S.%f')
11+
12+
13+
def _now():
14+
return datetime.datetime.utcnow()
15+
16+
17+
def _seconds(seconds):
18+
return datetime.timedelta(seconds=seconds)
19+
20+
21+
class LocalFileBlob(object):
22+
def __init__(self, fullpath):
23+
self.fullpath = fullpath
24+
25+
def delete(self, silent=False):
26+
try:
27+
os.remove(self.fullpath)
28+
except Exception:
29+
if not silent:
30+
raise
31+
32+
def get(self, silent=False):
33+
try:
34+
with open(self.fullpath, 'r') as file:
35+
return tuple(
36+
json.loads(line.strip())
37+
for line in file.readlines()
38+
)
39+
except Exception:
40+
if not silent:
41+
raise
42+
43+
def put(self, data, lease_period=0, silent=False):
44+
try:
45+
fullpath = self.fullpath + '.tmp'
46+
with open(fullpath, 'w') as file:
47+
for item in data:
48+
file.write(json.dumps(item))
49+
# The official Python doc: Do not use os.linesep as a line
50+
# terminator when writing files opened in text mode (the
51+
# default); use a single '\n' instead, on all platforms.
52+
file.write('\n')
53+
if lease_period:
54+
timestamp = _now() + _seconds(lease_period)
55+
self.fullpath += '@{}.lock'.format(_fmt(timestamp))
56+
os.rename(fullpath, self.fullpath)
57+
return self
58+
except Exception:
59+
if not silent:
60+
raise
61+
62+
def lease(self, period):
63+
timestamp = _now() + _seconds(period)
64+
fullpath = self.fullpath
65+
if fullpath.endswith('.lock'):
66+
fullpath = fullpath[: fullpath.rindex('@')]
67+
fullpath += '@{}.lock'.format(_fmt(timestamp))
68+
try:
69+
os.rename(self.fullpath, fullpath)
70+
except Exception:
71+
return None
72+
self.fullpath = fullpath
73+
return self
74+
75+
76+
class LocalFileStorage(object):
77+
def __init__(
78+
self,
79+
path,
80+
max_size=100*1024*1024, # 100MB
81+
maintenance_period=60, # 1 minute
82+
retention_period=7*24*60*60, # 7 days
83+
write_timeout=60, # 1 minute
84+
):
85+
self.path = os.path.abspath(path)
86+
self.max_size = max_size
87+
self.maintenance_period = maintenance_period
88+
self.retention_period = retention_period
89+
self.write_timeout = write_timeout
90+
self._maintenance_routine(silent=False)
91+
self._maintenance_task = PeriodicTask(
92+
interval=self.maintenance_period,
93+
function=self._maintenance_routine,
94+
kwargs={'silent': True},
95+
)
96+
self._maintenance_task.daemon = True
97+
self._maintenance_task.start()
98+
99+
def close(self):
100+
self._maintenance_task.cancel()
101+
self._maintenance_task.join()
102+
103+
def __enter__(self):
104+
return self
105+
106+
def __exit__(self, type, value, traceback):
107+
self.close()
108+
109+
def _maintenance_routine(self, silent=False):
110+
try:
111+
if not os.path.isdir(self.path):
112+
os.makedirs(self.path)
113+
except Exception:
114+
if not silent:
115+
raise
116+
try:
117+
for blob in self.gets():
118+
pass
119+
except Exception:
120+
if not silent:
121+
raise
122+
123+
def gets(self):
124+
now = _now()
125+
lease_deadline = _fmt(now)
126+
retention_deadline = _fmt(now - _seconds(self.retention_period))
127+
timeout_deadline = _fmt(now - _seconds(self.write_timeout))
128+
for name in sorted(os.listdir(self.path)):
129+
path = os.path.join(self.path, name)
130+
if not os.path.isfile(path):
131+
continue # skip if not a file
132+
if path.endswith('.tmp'):
133+
if name < timeout_deadline:
134+
try:
135+
os.remove(path) # TODO: log data loss
136+
except Exception:
137+
pass # keep silent
138+
if path.endswith('.lock'):
139+
if path[path.rindex('@') + 1: -5] > lease_deadline:
140+
continue # under lease
141+
new_path = path[: path.rindex('@')]
142+
try:
143+
os.rename(path, new_path)
144+
except Exception:
145+
continue # keep silent
146+
path = new_path
147+
if path.endswith('.blob'):
148+
if name < retention_deadline:
149+
try:
150+
os.remove(path) # TODO: log data loss
151+
except Exception:
152+
pass # keep silent
153+
else:
154+
yield LocalFileBlob(path)
155+
156+
def get(self):
157+
cursor = self.gets()
158+
try:
159+
return next(cursor)
160+
except StopIteration:
161+
pass
162+
return None
163+
164+
def put(self, data, lease_period=0, silent=False):
165+
blob = LocalFileBlob(os.path.join(
166+
self.path,
167+
'{}-{}.blob'.format(
168+
_fmt(_now()),
169+
'{:08x}'.format(random.getrandbits(32)), # thread-safe random
170+
),
171+
))
172+
return blob.put(data, lease_period=lease_period, silent=silent)

0 commit comments

Comments
 (0)