Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit 6901d09

Browse files
authored
Azure log exporter (#668)
Implemented the actual log conversion and exporting logic. Moved the network/storage transmission logic from trace exporter to a shared mixin, applied to both traces and logs. Avoid common names (Object and prototype), use BaseObject and _default instead.
1 parent 48b6318 commit 6901d09

File tree

13 files changed

+470
-158
lines changed

13 files changed

+470
-158
lines changed

contrib/opencensus-ext-azure/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
## Unreleased
44
- Added log exporter
5-
([#657](https://github.com/census-instrumentation/opencensus-python/pull/657))
5+
([#657](https://github.com/census-instrumentation/opencensus-python/pull/657),
6+
[#668](https://github.com/census-instrumentation/opencensus-python/pull/668))
67
- Added persistent storage support
78
([#640](https://github.com/census-instrumentation/opencensus-python/pull/640))
89
- Changed AzureExporter constructor signature to use kwargs

contrib/opencensus-ext-azure/examples/logs/correlated.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
# TODO: you need to specify the instrumentation key in the
2828
# APPINSIGHTS_INSTRUMENTATIONKEY environment variable.
2929
handler = AzureLogHandler()
30-
handler.setFormatter(logging.Formatter('%(traceId)s %(spanId)s %(message)s'))
3130
logger.addHandler(handler)
3231

3332
tracer = Tracer(exporter=AzureExporter(), sampler=ProbabilitySampler(1.0))
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 2019, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
17+
from opencensus.ext.azure.log_exporter import AzureLogHandler
18+
19+
logger = logging.getLogger(__name__)
20+
# TODO: you need to specify the instrumentation key in the
21+
# APPINSIGHTS_INSTRUMENTATIONKEY environment variable.
22+
logger.addHandler(AzureLogHandler())
23+
24+
25+
def main():
26+
try:
27+
return 1 / 0 # generate a ZeroDivisionError
28+
except Exception:
29+
logger.exception('Captured an exception.')
30+
31+
32+
if __name__ == '__main__':
33+
main()

contrib/opencensus-ext-azure/opencensus/ext/azure/common/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
import os
1616
import sys
1717

18-
from opencensus.ext.azure.common.protocol import Object
18+
from opencensus.ext.azure.common.protocol import BaseObject
1919

2020

21-
class Options(Object):
22-
prototype = Object(
21+
class Options(BaseObject):
22+
_default = BaseObject(
2323
endpoint='https://dc.services.visualstudio.com/v2/track',
2424
export_interval=15.0,
2525
grace_period=5.0,

contrib/opencensus-ext-azure/opencensus/ext/azure/common/protocol.py

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
# limitations under the License.
1414

1515

16-
class Object(dict):
16+
class BaseObject(dict):
1717
def __init__(self, *args, **kwargs):
18-
super(Object, self).__init__(*args, **kwargs)
18+
super(BaseObject, self).__init__(*args, **kwargs)
1919
for key in kwargs:
2020
self[key] = kwargs[key]
2121

@@ -25,9 +25,9 @@ def __repr__(self):
2525
for item in self.items():
2626
if item[0] not in tmp:
2727
tmp[item[0]] = item[1]
28-
if self.prototype == self:
28+
if self._default == self:
2929
break
30-
self = self.prototype
30+
self = self._default
3131
return repr(tmp)
3232

3333
def __setattr__(self, name, value):
@@ -36,25 +36,25 @@ def __setattr__(self, name, value):
3636
def __getattr__(self, name):
3737
try:
3838
return self[name]
39-
except KeyError as ex:
39+
except KeyError:
4040
raise AttributeError("'{}' object has no attribute {}".format(
4141
type(self).__name__,
42-
ex,
42+
name,
4343
))
4444

4545
def __getitem__(self, key):
46-
if self.prototype is self:
47-
return super(Object, self).__getitem__(key)
46+
if self._default is self:
47+
return super(BaseObject, self).__getitem__(key)
4848
if key in self:
49-
return super(Object, self).__getitem__(key)
50-
return self.prototype[key]
49+
return super(BaseObject, self).__getitem__(key)
50+
return self._default[key]
5151

5252

53-
Object.prototype = Object()
53+
BaseObject._default = BaseObject()
5454

5555

56-
class Data(Object):
57-
prototype = Object(
56+
class Data(BaseObject):
57+
_default = BaseObject(
5858
baseData=None,
5959
baseType=None,
6060
)
@@ -65,8 +65,8 @@ def __init__(self, *args, **kwargs):
6565
self.baseType = self.baseType
6666

6767

68-
class Envelope(Object):
69-
prototype = Object(
68+
class Envelope(BaseObject):
69+
_default = BaseObject(
7070
ver=1,
7171
name='',
7272
time='',
@@ -84,8 +84,8 @@ def __init__(self, *args, **kwargs):
8484
self.time = self.time
8585

8686

87-
class Event(Object):
88-
prototype = Object(
87+
class Event(BaseObject):
88+
_default = BaseObject(
8989
ver=2,
9090
name='',
9191
properties=None,
@@ -98,8 +98,24 @@ def __init__(self, *args, **kwargs):
9898
self.name = self.name
9999

100100

101-
class Message(Object):
102-
prototype = Object(
101+
class ExceptionData(BaseObject):
102+
_default = BaseObject(
103+
ver=2,
104+
exceptions=[],
105+
severityLevel=None,
106+
problemId=None,
107+
properties=None,
108+
measurements=None,
109+
)
110+
111+
def __init__(self, *args, **kwargs):
112+
super(ExceptionData, self).__init__(*args, **kwargs)
113+
self.ver = self.ver
114+
self.exceptions = self.exceptions
115+
116+
117+
class Message(BaseObject):
118+
_default = BaseObject(
103119
ver=2,
104120
message='',
105121
severityLevel=None,
@@ -113,8 +129,8 @@ def __init__(self, *args, **kwargs):
113129
self.message = self.message
114130

115131

116-
class RemoteDependency(Object):
117-
prototype = Object(
132+
class RemoteDependency(BaseObject):
133+
_default = BaseObject(
118134
ver=2,
119135
name='',
120136
id='',
@@ -136,8 +152,8 @@ def __init__(self, *args, **kwargs):
136152
self.duration = self.duration
137153

138154

139-
class Request(Object):
140-
prototype = Object(
155+
class Request(BaseObject):
156+
_default = BaseObject(
141157
ver=2,
142158
id='',
143159
duration='',
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# Copyright 2019, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import json
16+
import logging
17+
import requests
18+
19+
from opencensus.trace import execution_context
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
class TransportMixin(object):
25+
def _transmit_from_storage(self):
26+
for blob in self.storage.gets():
27+
# give a few more seconds for blob lease operation
28+
# to reduce the chance of race (for perf consideration)
29+
if blob.lease(self.options.timeout + 5):
30+
envelopes = blob.get() # TODO: handle error
31+
result = self._transmit(envelopes)
32+
if result > 0:
33+
blob.lease(result)
34+
else:
35+
blob.delete(silent=True)
36+
37+
def _transmit(self, envelopes):
38+
"""
39+
Transmit the data envelopes to the ingestion service.
40+
Return a negative value for partial success or non-retryable failure.
41+
Return 0 if all envelopes have been successfully ingested.
42+
Return the next retry time in seconds for retryable failure.
43+
This function should never throw exception.
44+
"""
45+
# TODO: prevent requests being tracked
46+
blacklist_hostnames = execution_context.get_opencensus_attr(
47+
'blacklist_hostnames',
48+
)
49+
execution_context.set_opencensus_attr(
50+
'blacklist_hostnames',
51+
['dc.services.visualstudio.com'],
52+
)
53+
try:
54+
response = requests.post(
55+
url=self.options.endpoint,
56+
data=json.dumps(envelopes),
57+
headers={
58+
'Accept': 'application/json',
59+
'Content-Type': 'application/json; charset=utf-8',
60+
},
61+
timeout=self.options.timeout,
62+
)
63+
except Exception as ex: # TODO: consider RequestException
64+
logger.warning('Transient client side error %s.', ex)
65+
# client side error (retryable)
66+
return self.options.minimum_retry_interval
67+
finally:
68+
execution_context.set_opencensus_attr(
69+
'blacklist_hostnames',
70+
blacklist_hostnames,
71+
)
72+
text = 'N/A'
73+
data = None
74+
try:
75+
text = response.text
76+
except Exception as ex:
77+
logger.warning('Error while reading response body %s.', ex)
78+
else:
79+
try:
80+
data = json.loads(text)
81+
except Exception:
82+
pass
83+
if response.status_code == 200:
84+
logger.info('Transmission succeeded: %s.', text)
85+
return 0
86+
if response.status_code == 206: # Partial Content
87+
# TODO: store the unsent data
88+
if data:
89+
try:
90+
resend_envelopes = []
91+
for error in data['errors']:
92+
if error['statusCode'] in (
93+
429, # Too Many Requests
94+
500, # Internal Server Error
95+
503, # Service Unavailable
96+
):
97+
resend_envelopes.append(envelopes[error['index']])
98+
else:
99+
logger.error(
100+
'Data drop %s: %s %s.',
101+
error['statusCode'],
102+
error['message'],
103+
envelopes[error['index']],
104+
)
105+
if resend_envelopes:
106+
self.storage.put(resend_envelopes)
107+
except Exception as ex:
108+
logger.error(
109+
'Error while processing %s: %s %s.',
110+
response.status_code,
111+
text,
112+
ex,
113+
)
114+
return -response.status_code
115+
# cannot parse response body, fallback to retry
116+
if response.status_code in (
117+
206, # Partial Content
118+
429, # Too Many Requests
119+
500, # Internal Server Error
120+
503, # Service Unavailable
121+
):
122+
logger.warning(
123+
'Transient server side error %s: %s.',
124+
response.status_code,
125+
text,
126+
)
127+
# server side error (retryable)
128+
return self.options.minimum_retry_interval
129+
logger.error(
130+
'Non-retryable server side error %s: %s.',
131+
response.status_code,
132+
text,
133+
)
134+
# server side error (non-retryable)
135+
return -response.status_code

contrib/opencensus-ext-azure/opencensus/ext/azure/common/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import datetime
1516
import locale
1617
import os
1718
import platform
@@ -24,6 +25,7 @@
2425

2526
from opencensus.common.version import __version__ as opencensus_version
2627
from opencensus.common.utils import timestamp_to_microseconds
28+
from opencensus.common.utils import to_iso_str
2729
from opencensus.ext.azure.common.version import __version__ as ext_version
2830

2931
azure_monitor_context = {
@@ -57,5 +59,9 @@ def timestamp_to_duration(start_time, end_time):
5759
return microseconds_to_duration(duration_us)
5860

5961

62+
def timestamp_to_iso_str(timestamp):
63+
return to_iso_str(datetime.datetime.utcfromtimestamp(timestamp))
64+
65+
6066
def url_to_dependency_name(url):
6167
return urlparse(url).netloc

0 commit comments

Comments
 (0)