1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14+ import collections
15+ import logging
1416
1517import grpc
16- import logging
1718
1819from opencensus .trace import attributes_helper
1920from opencensus .trace import tracer as tracer_module
21+ from opencensus .trace import execution_context
2022from opencensus .trace .ext import grpc as oc_grpc
2123from opencensus .trace .propagation import binary_format
2224
2325ATTRIBUTE_COMPONENT = 'COMPONENT'
2426ATTRIBUTE_ERROR_NAME = 'ERROR_NAME'
2527ATTRIBUTE_ERROR_MESSAGE = 'ERROR_MESSAGE'
2628
29+ RpcRequestInfo = collections .namedtuple (
30+ 'RPCRequestInfo' , ('request' , 'context' )
31+ )
32+ RpcResponseInfo = collections .namedtuple (
33+ 'RPCCallbackInfo' , ('request' , 'context' , 'response' , 'exc' )
34+ )
35+
36+
37+ class RpcMethodHandlerWrapper (object ):
38+ """Wraps a grpc RPCMethodHandler and records the variables about the
39+ execution context and response
40+ """
41+
42+ def __init__ (
43+ self , handler , pre_handler_callbacks = None , post_handler_callbacks = None
44+ ):
45+ """
46+ :param handler: instance of RpcMethodHandler
47+
48+ :param pre_handler_callbacks: iterable of callbacks that accept an
49+ instance of RpcRequestInfo that are called before the server handler
50+
51+ :param post_handler_callbacks: iterable of callbacks that accept an
52+ instance of RpcResponseInfo that are called after the server
53+ handler finishes execution
54+ """
55+ self .handler = handler
56+ self ._pre_handler_callbacks = pre_handler_callbacks or []
57+ self ._post_handler_callbacks = post_handler_callbacks or []
58+
59+ def proxy (self , prop_name ):
60+ def _wrapper (request , context , * args , ** kwargs ):
61+ for callback in self ._pre_handler_callbacks :
62+ callback (RpcRequestInfo (request , context ))
63+ exc = None
64+ response = None
65+ try :
66+ response = getattr (
67+ self .handler , prop_name
68+ )(request , context , * args , ** kwargs )
69+ except Exception as e :
70+ logging .error (e )
71+ exc = e
72+ raise
73+ finally :
74+ for callback in self ._post_handler_callbacks :
75+ callback (RpcResponseInfo (request , context , response , exc ))
76+ return response
77+
78+ return _wrapper
2779
28- class OpenCensusServerInterceptor (grpc .ServerInterceptor ):
80+ def __getattr__ (self , item ):
81+ if item in (
82+ 'unary_unary' , 'unary_stream' , 'stream_unary' , 'stream_stream'
83+ ):
84+ return self .proxy (item )
85+ return getattr (self .handler , item )
2986
87+
88+ class OpenCensusServerInterceptor (grpc .ServerInterceptor ):
3089 def __init__ (self , sampler = None , exporter = None ):
3190 self .sampler = sampler
3291 self .exporter = exporter
3392
34- def _start_server_span (self , tracer ):
35- span = tracer .start_span (name = 'grpc_server' )
36- tracer .add_attribute_to_current_span (
37- attribute_key = attributes_helper .COMMON_ATTRIBUTES .get (
38- ATTRIBUTE_COMPONENT ),
39- attribute_value = 'grpc' )
40-
41- return span
42-
43- def intercept_handler (self , continuation , handler_call_details ):
44- metadata = handler_call_details .invocation_metadata
93+ def _start_server_span (self , rpc_request_info ):
94+ metadata = rpc_request_info .context .invocation_metadata ()
4595 span_context = None
4696
4797 if metadata is not None :
@@ -55,21 +105,30 @@ def intercept_handler(self, continuation, handler_call_details):
55105 sampler = self .sampler ,
56106 exporter = self .exporter )
57107
58- with self ._start_server_span (tracer ):
59- response = None
108+ span = tracer .start_span (name = 'grpc_server' )
109+ tracer .add_attribute_to_current_span (
110+ attribute_key = attributes_helper .COMMON_ATTRIBUTES .get (
111+ ATTRIBUTE_COMPONENT ),
112+ attribute_value = 'grpc' )
60113
61- try :
62- response = continuation (handler_call_details )
63- except Exception as e : # pragma: NO COVER
64- logging .error (e )
65- tracer .add_attribute_to_current_span (
66- attributes_helper .COMMON_ATTRIBUTES .get (
67- ATTRIBUTE_ERROR_MESSAGE ),
68- str (e ))
69- tracer .end_span ()
70- raise
114+ execution_context .set_opencensus_tracer (tracer )
115+ execution_context .set_current_span (span )
116+
117+ def _end_server_span (self , rpc_response_info ):
118+ tracer = execution_context .get_opencensus_tracer ()
119+ if rpc_response_info .exc is not None :
120+ tracer .add_attribute_to_current_span (
121+ attributes_helper .COMMON_ATTRIBUTES .get (
122+ ATTRIBUTE_ERROR_MESSAGE ),
123+ str (rpc_response_info .exc ))
124+ tracer .end_span ()
71125
72- return response
126+ def intercept_handler (self , continuation , handler_call_details ):
127+ return RpcMethodHandlerWrapper (
128+ continuation (handler_call_details ),
129+ pre_handler_callbacks = [self ._start_server_span ],
130+ post_handler_callbacks = [self ._end_server_span ]
131+ )
73132
74133 def intercept_service (self , continuation , handler_call_details ):
75134 return self .intercept_handler (continuation , handler_call_details )
0 commit comments