X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
/
??
..
??
__init__.py
(12.51 KB)
??
__pycache__
??
_asgi_common.py
(4 KB)
??
_wsgi_common.py
(7.28 KB)
??
aiohttp.py
(19.28 KB)
??
aiomysql.py
(9.09 KB)
??
anthropic.py
(39 KB)
??
argv.py
(876 B)
??
ariadne.py
(5.7 KB)
??
arq.py
(9.23 KB)
??
asgi.py
(20.06 KB)
??
asyncio.py
(9.28 KB)
??
asyncpg.py
(9.68 KB)
??
atexit.py
(1.51 KB)
??
aws_lambda.py
(17.41 KB)
??
beam.py
(4.91 KB)
??
boto3.py
(6.2 KB)
??
bottle.py
(7.21 KB)
??
celery
??
chalice.py
(4.51 KB)
??
clickhouse_driver.py
(5.85 KB)
??
cloud_resource_context.py
(7.49 KB)
??
cohere.py
(10.44 KB)
??
dedupe.py
(1.86 KB)
??
django
??
dramatiq.py
(8.02 KB)
??
excepthook.py
(2.25 KB)
??
executing.py
(1.93 KB)
??
falcon.py
(9.04 KB)
??
fastapi.py
(5.28 KB)
??
flask.py
(8.27 KB)
??
gcp.py
(10.57 KB)
??
gnu_backtrace.py
(2.72 KB)
??
google_genai
??
gql.py
(4.93 KB)
??
graphene.py
(5.71 KB)
??
grpc
??
httpx.py
(9.79 KB)
??
httpx2.py
(9.8 KB)
??
huey.py
(8.19 KB)
??
huggingface_hub.py
(15.28 KB)
??
langchain.py
(48.31 KB)
??
langgraph.py
(18.13 KB)
??
launchdarkly.py
(1.87 KB)
??
litellm.py
(13.03 KB)
??
litestar.py
(11.46 KB)
??
logging.py
(15.69 KB)
??
loguru.py
(6.35 KB)
??
mcp.py
(23.12 KB)
??
modules.py
(787 B)
??
openai.py
(53.38 KB)
??
openai_agents
??
openfeature.py
(1.08 KB)
??
opentelemetry
??
otlp.py
(7.99 KB)
??
pure_eval.py
(4.41 KB)
??
pydantic_ai
??
pymongo.py
(8.21 KB)
??
pyramid.py
(7.42 KB)
??
pyreqwest.py
(6.82 KB)
??
quart.py
(7.32 KB)
??
ray.py
(5.75 KB)
??
redis
??
rq.py
(7.81 KB)
??
rust_tracing.py
(9.44 KB)
??
sanic.py
(15.25 KB)
??
serverless.py
(1.58 KB)
??
socket.py
(5.02 KB)
??
spark
??
sqlalchemy.py
(5.24 KB)
??
starlette.py
(27.93 KB)
??
starlite.py
(11.04 KB)
??
statsig.py
(1.19 KB)
??
stdlib.py
(14.01 KB)
??
strawberry.py
(17.39 KB)
??
sys_exit.py
(2.35 KB)
??
threading.py
(6.88 KB)
??
tornado.py
(10.79 KB)
??
trytond.py
(1.67 KB)
??
typer.py
(1.72 KB)
??
unleash.py
(1.02 KB)
??
unraisablehook.py
(1.65 KB)
??
wsgi.py
(15.03 KB)
Editing: dramatiq.py
import json from typing import TypeVar import sentry_sdk from sentry_sdk.api import continue_trace, get_baggage, get_traceparent from sentry_sdk.consts import OP, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.integrations._wsgi_common import request_body_within_bounds from sentry_sdk.traces import SegmentSource from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, TransactionSource, ) from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( AnnotatedValue, capture_internal_exceptions, event_from_exception, ) R = TypeVar("R") try: from dramatiq.broker import Broker from dramatiq.errors import Retry from dramatiq.message import Message from dramatiq.middleware import Middleware, default_middleware except ImportError: raise DidNotEnable("Dramatiq is not installed") from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any, Callable, Dict, Optional, Union from sentry_sdk._types import Event, Hint class DramatiqIntegration(Integration): """ Dramatiq integration for Sentry Please make sure that you call `sentry_sdk.init` *before* initializing your broker, as it monkey patches `Broker.__init__`. This integration was originally developed and maintained by https://github.com/jacobsvante and later donated to the Sentry project. """ identifier = "dramatiq" origin = f"auto.queue.{identifier}" @staticmethod def setup_once() -> None: _patch_dramatiq_broker() def _patch_dramatiq_broker() -> None: original_broker__init__ = Broker.__init__ def sentry_patched_broker__init__( self: "Broker", *args: "Any", **kw: "Any" ) -> None: integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) try: middleware = kw.pop("middleware") except KeyError: # Unfortunately Broker and StubBroker allows middleware to be # passed in as positional arguments, whilst RabbitmqBroker and # RedisBroker does not. if len(args) == 1: middleware = args[0] args = () else: middleware = None if middleware is None: middleware = list(m() for m in default_middleware) else: middleware = list(middleware) if integration is not None: middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)] middleware.insert(0, SentryMiddleware()) kw["middleware"] = middleware original_broker__init__(self, *args, **kw) Broker.__init__ = sentry_patched_broker__init__ class SentryMiddleware(Middleware): # type: ignore[misc] """ A Dramatiq middleware that automatically captures and sends exceptions to Sentry. This is automatically added to every instantiated broker via the DramatiqIntegration. """ SENTRY_HEADERS_NAME = "_sentry_headers" def before_enqueue( self, broker: "Broker", message: "Message[R]", delay: int ) -> None: integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) if integration is None: return message.options[self.SENTRY_HEADERS_NAME] = { BAGGAGE_HEADER_NAME: get_baggage(), SENTRY_TRACE_HEADER_NAME: get_traceparent(), } def before_process_message(self, broker: "Broker", message: "Message[R]") -> None: client = sentry_sdk.get_client() integration = client.get_integration(DramatiqIntegration) if integration is None: return message._scope_manager = sentry_sdk.isolation_scope() scope = message._scope_manager.__enter__() scope.clear_breadcrumbs() scope.set_extra("dramatiq_message_id", message.message_id) scope.add_event_processor(_make_message_event_processor(message, integration)) sentry_headers = message.options.get(self.SENTRY_HEADERS_NAME) or {} if "retries" in message.options: # start new trace in case of retrying sentry_headers = {} if has_span_streaming_enabled(client.options): sentry_sdk.traces.continue_trace(sentry_headers) span = sentry_sdk.traces.start_span( name=message.actor_name, attributes={ "sentry.op": OP.QUEUE_TASK_DRAMATIQ, "sentry.origin": DramatiqIntegration.origin, "sentry.span.source": SegmentSource.TASK.value, }, parent_span=None, ) message._sentry_span_ctx = span else: transaction = continue_trace( sentry_headers, name=message.actor_name, op=OP.QUEUE_TASK_DRAMATIQ, source=TransactionSource.TASK, origin=DramatiqIntegration.origin, ) transaction.set_status(SPANSTATUS.OK) sentry_sdk.start_transaction( transaction, name=message.actor_name, op=OP.QUEUE_TASK_DRAMATIQ, source=TransactionSource.TASK, ) transaction.__enter__() message._sentry_span_ctx = transaction def after_process_message( self, broker: "Broker", message: "Message[R]", *, result: "Optional[Any]" = None, exception: "Optional[Exception]" = None, ) -> None: integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) if integration is None: return actor = broker.get_actor(message.actor_name) throws = message.options.get("throws") or actor.options.get("throws") scope_manager = message._scope_manager span_ctx = getattr(message, "_sentry_span_ctx", None) if span_ctx is None: return None is_event_capture_required = ( exception is not None and not (throws and isinstance(exception, throws)) and not isinstance(exception, Retry) ) if not is_event_capture_required: # normal transaction finish span_ctx.__exit__(None, None, None) scope_manager.__exit__(None, None, None) return event, hint = event_from_exception( exception, # type: ignore[arg-type] client_options=sentry_sdk.get_client().options, mechanism={ "type": DramatiqIntegration.identifier, "handled": False, }, ) sentry_sdk.capture_event(event, hint=hint) # transaction error span_ctx.__exit__(type(exception), exception, None) scope_manager.__exit__(type(exception), exception, None) after_skip_message = after_process_message def _make_message_event_processor( message: "Message[R]", integration: "DramatiqIntegration" ) -> "Callable[[Event, Hint], Optional[Event]]": def inner(event: "Event", hint: "Hint") -> "Optional[Event]": with capture_internal_exceptions(): DramatiqMessageExtractor(message).extract_into_event(event) return event return inner class DramatiqMessageExtractor: def __init__(self, message: "Message[R]") -> None: self.message_data = dict(message.asdict()) def content_length(self) -> int: return len(json.dumps(self.message_data)) def extract_into_event(self, event: "Event") -> None: client = sentry_sdk.get_client() if not client.is_active(): return contexts = event.setdefault("contexts", {}) request_info = contexts.setdefault("dramatiq", {}) request_info["type"] = "dramatiq" data: "Optional[Union[AnnotatedValue, Dict[str, Any]]]" = None if not request_body_within_bounds(client, self.content_length()): data = AnnotatedValue.removed_because_over_size_limit() else: data = self.message_data request_info["data"] = data
Upload File
Create Folder