X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations/grpc
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
/
grpc
/
??
..
??
__init__.py
(6.04 KB)
??
__pycache__
??
aio
??
client.py
(5.34 KB)
??
consts.py
(31 B)
??
server.py
(3.38 KB)
Editing: client.py
from typing import TYPE_CHECKING import sentry_sdk from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN from sentry_sdk.tracing_utils import has_span_streaming_enabled if TYPE_CHECKING: from typing import Any, Callable, Iterable, Iterator, Union try: import grpc from google.protobuf.message import Message from grpc import Call, ClientCallDetails from grpc._interceptor import _UnaryOutcome from grpc.aio._interceptor import UnaryStreamCall except ImportError: raise DidNotEnable("grpcio is not installed") class ClientInterceptor( grpc.UnaryUnaryClientInterceptor, # type: ignore grpc.UnaryStreamClientInterceptor, # type: ignore ): def intercept_unary_unary( self: "ClientInterceptor", continuation: "Callable[[ClientCallDetails, Message], _UnaryOutcome]", client_call_details: "ClientCallDetails", request: "Message", ) -> "_UnaryOutcome": method = client_call_details.method span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) if span_streaming: with sentry_sdk.traces.start_span( name="unary unary call to %s" % method, attributes={ "sentry.op": OP.GRPC_CLIENT, "sentry.origin": SPAN_ORIGIN, SPANDATA.RPC_METHOD: method, }, ) as span: client_call_details = ( self._update_client_call_details_metadata_from_scope( client_call_details ) ) response = continuation(client_call_details, request) span.set_attribute( SPANDATA.RPC_RESPONSE_STATUS_CODE, response.code().name ) return response else: with sentry_sdk.start_span( op=OP.GRPC_CLIENT, name="unary unary call to %s" % method, origin=SPAN_ORIGIN, ) as span: span.set_data("type", "unary unary") span.set_data("method", method) client_call_details = ( self._update_client_call_details_metadata_from_scope( client_call_details ) ) response = continuation(client_call_details, request) span.set_data("code", response.code().name) return response def intercept_unary_stream( self: "ClientInterceptor", continuation: "Callable[[ClientCallDetails, Message], Union[Iterable[Any], UnaryStreamCall]]", client_call_details: "ClientCallDetails", request: "Message", ) -> "Union[Iterator[Message], Call]": method = client_call_details.method span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) response: "UnaryStreamCall" if span_streaming: with sentry_sdk.traces.start_span( name="unary stream call to %s" % method, attributes={ "sentry.op": OP.GRPC_CLIENT, "sentry.origin": SPAN_ORIGIN, SPANDATA.RPC_METHOD: method, }, ) as span: client_call_details = ( self._update_client_call_details_metadata_from_scope( client_call_details ) ) response = continuation(client_call_details, request) # Setting code on unary-stream leads to execution getting stuck # span.set_data("code", response.code().name) return response else: with sentry_sdk.start_span( op=OP.GRPC_CLIENT, name="unary stream call to %s" % method, origin=SPAN_ORIGIN, ) as span: span.set_data("type", "unary stream") span.set_data("method", method) client_call_details = ( self._update_client_call_details_metadata_from_scope( client_call_details ) ) response = continuation(client_call_details, request) # Setting code on unary-stream leads to execution getting stuck # span.set_data("code", response.code().name) return response @staticmethod def _update_client_call_details_metadata_from_scope( client_call_details: "ClientCallDetails", ) -> "ClientCallDetails": metadata = ( list(client_call_details.metadata) if client_call_details.metadata else [] ) for ( key, value, ) in sentry_sdk.get_current_scope().iter_trace_propagation_headers(): metadata.append((key, value)) client_call_details = grpc._interceptor._ClientCallDetails( method=client_call_details.method, timeout=client_call_details.timeout, metadata=metadata, credentials=client_call_details.credentials, wait_for_ready=client_call_details.wait_for_ready, compression=client_call_details.compression, ) return client_call_details
Upload File
Create Folder