Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit 53f7aaa

Browse files
wkiserliyanhui1228
authored andcommitted
Add streaming support and message events to gRPC interceptors (#169)
1 parent ab1953f commit 53f7aaa

5 files changed

Lines changed: 302 additions & 198 deletions

File tree

opencensus/trace/ext/grpc/client_interceptor.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414

1515
import collections
1616
import logging
17+
1718
import grpc
1819
import six
1920

20-
from opencensus.trace import execution_context
2121
from opencensus.trace import attributes_helper
22+
from opencensus.trace import execution_context
23+
from opencensus.trace import time_event
2224
from opencensus.trace.ext import grpc as oc_grpc
25+
from opencensus.trace.ext.grpc import utils as grpc_utils
2326
from opencensus.trace.propagation import binary_format
2427

2528
log = logging.getLogger(__name__)
@@ -91,7 +94,8 @@ def _end_span_between_context(self, current_span):
9194
self.tracer.end_span()
9295

9396
def _intercept_call(
94-
self, client_call_details, request_iterator, grpc_type):
97+
self, client_call_details, request_iterator, grpc_type
98+
):
9599
metadata = ()
96100
if client_call_details.metadata is not None:
97101
metadata = client_call_details.metadata
@@ -118,10 +122,21 @@ def _intercept_call(
118122
metadata,
119123
client_call_details.credentials)
120124

125+
request_iterator = grpc_utils.wrap_iter_with_message_events(
126+
request_or_response_iter=request_iterator,
127+
span=current_span,
128+
message_event_type=time_event.Type.SENT
129+
)
130+
121131
return client_call_details, request_iterator, current_span
122132

123133
def _callback(self, current_span):
124134
def callback(future_response):
135+
grpc_utils.add_message_event(
136+
proto_message=future_response.result(),
137+
span=current_span,
138+
message_event_type=time_event.Type.RECEIVED,
139+
)
125140
execution_context.set_current_span(current_span)
126141
self._trace_future_exception(future_response)
127142
self.tracer.end_span()
@@ -141,7 +156,8 @@ def _trace_future_exception(self, response):
141156
attribute_value=exception)
142157

143158
def intercept_unary_unary(
144-
self, continuation, client_call_details, request):
159+
self, continuation, client_call_details, request
160+
):
145161
if CLOUD_TRACE in client_call_details.method:
146162
response = continuation(client_call_details, request)
147163
return response
@@ -159,8 +175,9 @@ def intercept_unary_unary(
159175

160176
return response
161177

162-
def intercept_unary_stream(self, continuation, client_call_details,
163-
request):
178+
def intercept_unary_stream(
179+
self, continuation, client_call_details, request
180+
):
164181
if CLOUD_TRACE in client_call_details.method:
165182
response = continuation(client_call_details, request)
166183
return response
@@ -173,12 +190,18 @@ def intercept_unary_stream(self, continuation, client_call_details,
173190
response_it = continuation(
174191
new_details,
175192
next(new_request_iterator))
176-
self._end_span_between_context(current_span)
193+
response_it = grpc_utils.wrap_iter_with_message_events(
194+
request_or_response_iter=response_it,
195+
span=current_span,
196+
message_event_type=time_event.Type.RECEIVED
197+
)
198+
response_it = grpc_utils.wrap_iter_with_end_span(response_it)
177199

178200
return response_it
179201

180-
def intercept_stream_unary(self, continuation, client_call_details,
181-
request_iterator):
202+
def intercept_stream_unary(
203+
self, continuation, client_call_details, request_iterator
204+
):
182205
if CLOUD_TRACE in client_call_details.method:
183206
response = continuation(client_call_details, request_iterator)
184207
return response
@@ -196,8 +219,9 @@ def intercept_stream_unary(self, continuation, client_call_details,
196219

197220
return response
198221

199-
def intercept_stream_stream(self, continuation, client_call_details,
200-
request_iterator):
222+
def intercept_stream_stream(
223+
self, continuation, client_call_details, request_iterator
224+
):
201225
if CLOUD_TRACE in client_call_details.method:
202226
response = continuation(client_call_details, request_iterator)
203227
return response
@@ -210,7 +234,12 @@ def intercept_stream_stream(self, continuation, client_call_details,
210234
response_it = continuation(
211235
new_details,
212236
new_request_iterator)
213-
self._end_span_between_context(current_span)
237+
response_it = grpc_utils.wrap_iter_with_message_events(
238+
request_or_response_iter=response_it,
239+
span=current_span,
240+
message_event_type=time_event.Type.RECEIVED
241+
)
242+
response_it = grpc_utils.wrap_iter_with_end_span(response_it)
214243

215244
return response_it
216245

opencensus/trace/ext/grpc/server_interceptor.py

Lines changed: 105 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -11,96 +11,87 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import collections
1514
import logging
1615
import sys
1716

18-
from google.rpc import code_pb2
1917
import grpc
18+
from google.rpc import code_pb2
2019

2120
from opencensus.trace import attributes_helper
2221
from opencensus.trace import execution_context
2322
from opencensus.trace import stack_trace as stack_trace
2423
from opencensus.trace import status
24+
from opencensus.trace import time_event
2525
from opencensus.trace import tracer as tracer_module
2626
from opencensus.trace.ext import grpc as oc_grpc
27+
from opencensus.trace.ext.grpc import utils as grpc_utils
2728
from opencensus.trace.propagation import binary_format
2829

2930
ATTRIBUTE_COMPONENT = 'COMPONENT'
3031
ATTRIBUTE_ERROR_NAME = 'ERROR_NAME'
3132
ATTRIBUTE_ERROR_MESSAGE = 'ERROR_MESSAGE'
3233
RECV_PREFIX = 'Recv'
3334

34-
RpcRequestInfo = collections.namedtuple(
35-
'RPCRequestInfo', ('request', 'context')
36-
)
37-
# exc_info is the three tuple defined at:
38-
# https://docs.python.org/3/library/sys.html#sys.exc_info
39-
RpcResponseInfo = collections.namedtuple(
40-
'RPCCallbackInfo', ('request', 'context', 'response', 'exc_info')
41-
)
42-
43-
44-
class RpcMethodHandlerWrapper(object):
45-
"""Wraps a grpc RPCMethodHandler and records the variables about the
46-
execution context and response
47-
"""
48-
49-
def __init__(
50-
self, handler, pre_handler_callbacks=None, post_handler_callbacks=None
51-
):
52-
"""
53-
:param handler: instance of RpcMethodHandler
54-
55-
:param pre_handler_callbacks: iterable of callbacks that accept an
56-
instance of RpcRequestInfo that are called before the server handler
57-
58-
:param post_handler_callbacks: iterable of callbacks that accept an
59-
instance of RpcResponseInfo that are called after the server
60-
handler finishes execution
61-
"""
62-
self.handler = handler
63-
self._pre_handler_callbacks = pre_handler_callbacks or []
64-
self._post_handler_callbacks = post_handler_callbacks or []
65-
66-
def proxy(self, prop_name):
67-
def _wrapper(request, context, *args, **kwargs):
68-
for callback in self._pre_handler_callbacks:
69-
callback(RpcRequestInfo(request, context))
70-
exc_info = (None, None, None)
71-
response = None
72-
try:
73-
response = getattr(
74-
self.handler, prop_name
75-
)(request, context, *args, **kwargs)
76-
except Exception as e:
77-
logging.exception(e)
78-
exc_info = sys.exc_info()
79-
raise
80-
finally:
81-
for callback in self._post_handler_callbacks:
82-
callback(RpcResponseInfo(
83-
request, context, response, exc_info)
84-
)
85-
return response
86-
87-
return _wrapper
88-
89-
def __getattr__(self, item):
90-
if item in (
91-
'unary_unary', 'unary_stream', 'stream_unary', 'stream_stream'
92-
):
93-
return self.proxy(item)
94-
return getattr(self.handler, item)
95-
9635

9736
class OpenCensusServerInterceptor(grpc.ServerInterceptor):
9837
def __init__(self, sampler=None, exporter=None):
9938
self.sampler = sampler
10039
self.exporter = exporter
10140

102-
def _start_server_span(self, rpc_request_info):
103-
metadata = rpc_request_info.context.invocation_metadata()
41+
def intercept_service(self, continuation, handler_call_details):
42+
def trace_wrapper(behavior, request_streaming, response_streaming):
43+
def new_behavior(request_or_iterator, servicer_context):
44+
span = self._start_server_span(servicer_context)
45+
try:
46+
if request_streaming:
47+
request_or_iterator = grpc_utils.wrap_iter_with_message_events( # noqa: E501
48+
request_or_response_iter=request_or_iterator,
49+
span=span,
50+
message_event_type=time_event.Type.RECEIVED
51+
)
52+
else:
53+
grpc_utils.add_message_event(
54+
proto_message=request_or_iterator,
55+
span=span,
56+
message_event_type=time_event.Type.RECEIVED,
57+
)
58+
# invoke the original rpc behavior
59+
response_or_iterator = behavior(request_or_iterator,
60+
servicer_context)
61+
if response_streaming:
62+
response_or_iterator = grpc_utils.wrap_iter_with_message_events( # noqa: E501
63+
request_or_response_iter=response_or_iterator,
64+
span=span,
65+
message_event_type=time_event.Type.SENT
66+
)
67+
response_or_iterator = grpc_utils.wrap_iter_with_end_span( # noqa: E501
68+
response_or_iterator)
69+
else:
70+
grpc_utils.add_message_event(
71+
proto_message=response_or_iterator,
72+
span=span,
73+
message_event_type=time_event.Type.SENT,
74+
)
75+
except Exception as exc:
76+
logging.exception(exc)
77+
_add_exc_info(span)
78+
raise
79+
finally:
80+
# if the response is unary, end the span here. Otherwise
81+
# it will be closed when the response iter completes
82+
if not response_streaming:
83+
execution_context.get_opencensus_tracer().end_span()
84+
return response_or_iterator
85+
86+
return new_behavior
87+
88+
return _wrap_rpc_behavior(
89+
continuation(handler_call_details),
90+
trace_wrapper
91+
)
92+
93+
def _start_server_span(self, servicer_context):
94+
metadata = servicer_context.invocation_metadata()
10495
span_context = None
10596

10697
if metadata is not None:
@@ -114,49 +105,62 @@ def _start_server_span(self, rpc_request_info):
114105
sampler=self.sampler,
115106
exporter=self.exporter)
116107

117-
span = tracer.start_span(name=_get_span_name(rpc_request_info))
108+
span = tracer.start_span(
109+
name=_get_span_name(servicer_context)
110+
)
118111
tracer.add_attribute_to_current_span(
119112
attribute_key=attributes_helper.COMMON_ATTRIBUTES.get(
120113
ATTRIBUTE_COMPONENT),
121114
attribute_value='grpc')
122115

123116
execution_context.set_opencensus_tracer(tracer)
124117
execution_context.set_current_span(span)
125-
126-
def _end_server_span(self, rpc_response_info):
127-
tracer = execution_context.get_opencensus_tracer()
128-
exc_type, exc_value, tb = rpc_response_info.exc_info
129-
if exc_type is not None:
130-
current_span = tracer.current_span()
131-
current_span.add_attribute(
132-
attributes_helper.COMMON_ATTRIBUTES.get(
133-
ATTRIBUTE_ERROR_MESSAGE),
134-
str(exc_value)
135-
)
136-
current_span.stack_trace = stack_trace.StackTrace.from_traceback(
137-
tb
138-
)
139-
current_span.status = status.Status(
140-
code=code_pb2.UNKNOWN,
141-
message=str(exc_value)
142-
)
143-
144-
tracer.end_span()
145-
146-
def intercept_handler(self, continuation, handler_call_details):
147-
return RpcMethodHandlerWrapper(
148-
continuation(handler_call_details),
149-
pre_handler_callbacks=[self._start_server_span],
150-
post_handler_callbacks=[self._end_server_span]
151-
)
152-
153-
def intercept_service(self, continuation, handler_call_details):
154-
return self.intercept_handler(continuation, handler_call_details)
155-
156-
157-
def _get_span_name(rpc_request_info):
118+
return span
119+
120+
121+
def _add_exc_info(span):
122+
exc_type, exc_value, tb = sys.exc_info()
123+
span.add_attribute(
124+
attributes_helper.COMMON_ATTRIBUTES.get(
125+
ATTRIBUTE_ERROR_MESSAGE),
126+
str(exc_value)
127+
)
128+
span.stack_trace = stack_trace.StackTrace.from_traceback(tb)
129+
span.status = status.Status(
130+
code=code_pb2.UNKNOWN,
131+
message=str(exc_value)
132+
)
133+
134+
135+
def _wrap_rpc_behavior(handler, fn):
136+
"""Returns a new rpc handler that wraps the given function"""
137+
if handler is None:
138+
return None
139+
140+
if handler.request_streaming and handler.response_streaming:
141+
behavior_fn = handler.stream_stream
142+
handler_factory = grpc.stream_stream_rpc_method_handler
143+
elif handler.request_streaming and not handler.response_streaming:
144+
behavior_fn = handler.stream_unary
145+
handler_factory = grpc.stream_unary_rpc_method_handler
146+
elif not handler.request_streaming and handler.response_streaming:
147+
behavior_fn = handler.unary_stream
148+
handler_factory = grpc.unary_stream_rpc_method_handler
149+
else:
150+
behavior_fn = handler.unary_unary
151+
handler_factory = grpc.unary_unary_rpc_method_handler
152+
153+
return handler_factory(
154+
fn(behavior_fn, handler.request_streaming,
155+
handler.response_streaming),
156+
request_deserializer=handler.request_deserializer,
157+
response_serializer=handler.response_serializer
158+
)
159+
160+
161+
def _get_span_name(servicer_context):
158162
"""Generates a span name based off of the gRPC server rpc_request_info"""
159-
method_name = rpc_request_info.context._rpc_event.call_details.method[1:]
163+
method_name = servicer_context._rpc_event.call_details.method[1:]
160164
if isinstance(method_name, bytes):
161165
method_name = method_name.decode('utf-8')
162166
method_name = method_name.replace('/', '.')

0 commit comments

Comments
 (0)