Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit f534b6e

Browse files
authored
Add interceptors for grpc tracing in client side and server side (#12)
1 parent cfcfda5 commit f534b6e

9 files changed

Lines changed: 593 additions & 0 deletions

File tree

opencensus/trace/attributes_helper.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,9 @@
3333
'STACKTRACE': '/stacktrace',
3434
'TID': '/tid',
3535
}
36+
37+
38+
GRPC_ATTRIBUTES = {
39+
'GRPC_HOST_PORT': '/grpc/host_port',
40+
'GRPC_METHOD': '/grpc/method',
41+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright 2017, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
UNARY_UNARY = 'unary_unary'
16+
UNARY_STREAM = 'unary_stream'
17+
STREAM_UNARY = 'stream_unary'
18+
STREAM_STREAM = 'stream_stream'
19+
20+
GRPC_TRACE_KEY = 'grpc-trace-bin'
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
# Copyright 2017, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import collections
16+
import logging
17+
import grpc
18+
import six
19+
20+
from opencensus.trace import execution_context
21+
from opencensus.trace import attributes_helper
22+
from opencensus.trace.ext import grpc as oc_grpc
23+
from opencensus.trace.propagation import binary_format
24+
25+
log = logging.getLogger(__name__)
26+
27+
ATTRIBUTE_COMPONENT = 'COMPONENT'
28+
ATTRIBUTE_ERROR_NAME = 'ERROR_NAME'
29+
ATTRIBUTE_ERROR_MESSAGE = 'ERROR_MESSAGE'
30+
GRPC_HOST_PORT = 'GRPC_HOST_PORT'
31+
GRPC_METHOD = 'GRPC_METHOD'
32+
33+
TIMEOUT = 3
34+
35+
36+
class _ClientCallDetails(
37+
collections.namedtuple(
38+
'_ClientCallDetails',
39+
('method', 'timeout', 'metadata', 'credentials')),
40+
grpc.ClientCallDetails):
41+
pass
42+
43+
44+
class OpenCensusClientInterceptor(grpc.UnaryUnaryClientInterceptor,
45+
grpc.UnaryStreamClientInterceptor,
46+
grpc.StreamUnaryClientInterceptor,
47+
grpc.StreamStreamClientInterceptor):
48+
49+
def __init__(self, tracer=None, host_port=None):
50+
if tracer is None:
51+
tracer = execution_context.get_opencensus_tracer()
52+
53+
self._tracer = tracer
54+
self.host_port = host_port
55+
self._propagator = binary_format.BinaryFormatPropagator()
56+
57+
def _start_client_span(self, method, grpc_type):
58+
span = self._tracer.start_span(
59+
name='[gRPC_client][{}]{}'.format(grpc_type, str(method)))
60+
61+
# Add the component grpc to span attribute
62+
self._tracer.add_attribute_to_current_span(
63+
attribute_key=attributes_helper.COMMON_ATTRIBUTES.get(
64+
ATTRIBUTE_COMPONENT),
65+
attribute_value='grpc')
66+
67+
# Add the host:port info to span attribute
68+
self._tracer.add_attribute_to_current_span(
69+
attribute_key=attributes_helper.GRPC_ATTRIBUTES.get(
70+
GRPC_HOST_PORT),
71+
attribute_value=self.host_port)
72+
73+
# Add the method to span attribute
74+
self._tracer.add_attribute_to_current_span(
75+
attribute_key=attributes_helper.GRPC_ATTRIBUTES.get(GRPC_METHOD),
76+
attribute_value=str(method))
77+
78+
return span
79+
80+
def _end_span_between_context(self, current_span):
81+
execution_context.set_current_span(current_span)
82+
self._tracer.end_span()
83+
84+
def _intercept_call(
85+
self, client_call_details, request_iterator, grpc_type):
86+
metadata = ()
87+
if client_call_details.metadata is not None:
88+
metadata = client_call_details.metadata
89+
90+
# Start a span
91+
current_span = self._start_client_span(
92+
client_call_details.method,
93+
grpc_type)
94+
95+
span_context = self._tracer.span_context
96+
header = self._propagator.to_header(span_context)
97+
grpc_trace_metadata = {
98+
oc_grpc.GRPC_TRACE_KEY: header,
99+
}
100+
metadata = metadata + tuple(six.iteritems(grpc_trace_metadata))
101+
102+
client_call_details = _ClientCallDetails(
103+
client_call_details.method,
104+
client_call_details.timeout,
105+
metadata,
106+
client_call_details.credentials)
107+
108+
return client_call_details, request_iterator, current_span
109+
110+
def _callback(self, current_span):
111+
def callback(future_response):
112+
execution_context.set_current_span(current_span)
113+
self._trace_future_exception(future_response)
114+
self._tracer.end_span()
115+
116+
return callback
117+
118+
def _trace_future_exception(self, response):
119+
# Trace the exception for a grpc.Future if any
120+
exception = response.exception(timeout=TIMEOUT)
121+
122+
if exception is not None:
123+
exception = str(exception)
124+
125+
self._tracer.add_attribute_to_current_span(
126+
attribute_key=attributes_helper.COMMON_ATTRIBUTES.get(
127+
ATTRIBUTE_ERROR_MESSAGE),
128+
attribute_value=exception)
129+
130+
def intercept_unary_unary(
131+
self, continuation, client_call_details, request):
132+
new_details, new_request, current_span = self._intercept_call(
133+
client_call_details=client_call_details,
134+
request_iterator=iter((request,)),
135+
grpc_type=oc_grpc.UNARY_UNARY)
136+
137+
response = continuation(
138+
new_details,
139+
next(new_request))
140+
141+
response.add_done_callback(self._callback(current_span))
142+
143+
return response
144+
145+
def intercept_unary_stream(self, continuation, client_call_details,
146+
request):
147+
new_details, new_request_iterator, current_span = self._intercept_call(
148+
client_call_details=client_call_details,
149+
request_iterator=iter((request,)),
150+
grpc_type=oc_grpc.UNARY_STREAM)
151+
152+
response_it = continuation(
153+
new_details,
154+
next(new_request_iterator))
155+
self._end_span_between_context(current_span)
156+
157+
return response_it
158+
159+
def intercept_stream_unary(self, continuation, client_call_details,
160+
request_iterator):
161+
new_details, new_request_iterator, current_span = self._intercept_call(
162+
client_call_details=client_call_details,
163+
request_iterator=request_iterator,
164+
grpc_type=oc_grpc.STREAM_UNARY)
165+
166+
response = continuation(
167+
new_details,
168+
new_request_iterator)
169+
170+
response.add_done_callback(self._callback(current_span))
171+
172+
return response
173+
174+
def intercept_stream_stream(self, continuation, client_call_details,
175+
request_iterator):
176+
new_details, new_request_iterator, current_span = self._intercept_call(
177+
client_call_details=client_call_details,
178+
request_iterator=request_iterator,
179+
grpc_type=oc_grpc.STREAM_STREAM)
180+
181+
response_it = continuation(
182+
new_details,
183+
new_request_iterator)
184+
self._end_span_between_context(current_span)
185+
186+
return response_it
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright 2017, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import grpc
16+
import logging
17+
18+
from opencensus.trace import attributes_helper
19+
from opencensus.trace import tracer as tracer_module
20+
from opencensus.trace.ext import grpc as oc_grpc
21+
from opencensus.trace.propagation import binary_format
22+
23+
ATTRIBUTE_COMPONENT = 'COMPONENT'
24+
ATTRIBUTE_ERROR_NAME = 'ERROR_NAME'
25+
ATTRIBUTE_ERROR_MESSAGE = 'ERROR_MESSAGE'
26+
27+
28+
class OpenCensusServerInterceptor(grpc.ServerInterceptor):
29+
30+
def __init__(self, sampler=None, exporter=None):
31+
self.sampler = sampler
32+
self.exporter = exporter
33+
34+
def _start_server_span(self, tracer):
35+
span = tracer.start_span(name='grpc_server')
36+
tracer.add_attribute_to_current_span(
37+
attribute_key=attributes_helper.COMMON_ATTRIBUTES.get(
38+
ATTRIBUTE_COMPONENT),
39+
attribute_value='grpc')
40+
41+
return span
42+
43+
def intercept_handler(self, continuation, handler_call_details):
44+
metadata = handler_call_details.invocation_metadata
45+
span_context = None
46+
47+
if metadata is not None:
48+
propagator = binary_format.BinaryFormatPropagator()
49+
metadata_dict = dict(metadata)
50+
trace_header = metadata_dict.get(oc_grpc.GRPC_TRACE_KEY)
51+
52+
span_context = propagator.from_header(trace_header)
53+
54+
tracer = tracer_module.Tracer(span_context=span_context,
55+
sampler=self.sampler,
56+
exporter=self.exporter)
57+
58+
with self._start_server_span(tracer):
59+
response = None
60+
61+
try:
62+
response = continuation(handler_call_details)
63+
except Exception as e: # pragma: NO COVER
64+
logging.error(e)
65+
tracer.add_attribute_to_current_span(
66+
attributes_helper.COMMON_ATTRIBUTES.get(
67+
ATTRIBUTE_ERROR_MESSAGE),
68+
str(e))
69+
tracer.end_span()
70+
raise
71+
72+
return response
73+
74+
def intercept_service(self, continuation, handler_call_details):
75+
return self.intercept_handler(continuation, handler_call_details)

opencensus/trace/propagation/binary_format.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ def from_header(self, binary):
104104
:rtype: :class:`~opencensus.trace.span_context.SpanContext`
105105
:returns: SpanContext generated from the trace context header.
106106
"""
107+
# If no binary provided, generate a new SpanContext
108+
if binary is None:
109+
return SpanContext(from_header=False)
110+
107111
# If cannot parse, return a new SpanContext and ignore the context
108112
# from binary.
109113
try:

requirements-test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
Django==1.11.7
22
Flask==0.12.2
33
google-cloud-trace==0.17.0
4+
grpcio==1.8.0rc3
45
mock==2.0.0
56
mysql-connector==2.1.6
67
psycopg2==2.7.3.1

0 commit comments

Comments
 (0)