X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
/
??
..
??
__init__.py
(12.51 KB)
??
__pycache__
??
_asgi_common.py
(4 KB)
??
_wsgi_common.py
(7.28 KB)
??
aiohttp.py
(19.28 KB)
??
aiomysql.py
(9.09 KB)
??
anthropic.py
(39 KB)
??
argv.py
(876 B)
??
ariadne.py
(5.7 KB)
??
arq.py
(9.23 KB)
??
asgi.py
(20.06 KB)
??
asyncio.py
(9.28 KB)
??
asyncpg.py
(9.68 KB)
??
atexit.py
(1.51 KB)
??
aws_lambda.py
(17.41 KB)
??
beam.py
(4.91 KB)
??
boto3.py
(6.2 KB)
??
bottle.py
(7.21 KB)
??
celery
??
chalice.py
(4.51 KB)
??
clickhouse_driver.py
(5.85 KB)
??
cloud_resource_context.py
(7.49 KB)
??
cohere.py
(10.44 KB)
??
dedupe.py
(1.86 KB)
??
django
??
dramatiq.py
(8.02 KB)
??
excepthook.py
(2.25 KB)
??
executing.py
(1.93 KB)
??
falcon.py
(9.04 KB)
??
fastapi.py
(5.28 KB)
??
flask.py
(8.27 KB)
??
gcp.py
(10.57 KB)
??
gnu_backtrace.py
(2.72 KB)
??
google_genai
??
gql.py
(4.93 KB)
??
graphene.py
(5.71 KB)
??
grpc
??
httpx.py
(9.79 KB)
??
httpx2.py
(9.8 KB)
??
huey.py
(8.19 KB)
??
huggingface_hub.py
(15.28 KB)
??
langchain.py
(48.31 KB)
??
langgraph.py
(18.13 KB)
??
launchdarkly.py
(1.87 KB)
??
litellm.py
(13.03 KB)
??
litestar.py
(11.46 KB)
??
logging.py
(15.69 KB)
??
loguru.py
(6.35 KB)
??
mcp.py
(23.12 KB)
??
modules.py
(787 B)
??
openai.py
(53.38 KB)
??
openai_agents
??
openfeature.py
(1.08 KB)
??
opentelemetry
??
otlp.py
(7.99 KB)
??
pure_eval.py
(4.41 KB)
??
pydantic_ai
??
pymongo.py
(8.21 KB)
??
pyramid.py
(7.42 KB)
??
pyreqwest.py
(6.82 KB)
??
quart.py
(7.32 KB)
??
ray.py
(5.75 KB)
??
redis
??
rq.py
(7.81 KB)
??
rust_tracing.py
(9.44 KB)
??
sanic.py
(15.25 KB)
??
serverless.py
(1.58 KB)
??
socket.py
(5.02 KB)
??
spark
??
sqlalchemy.py
(5.24 KB)
??
starlette.py
(27.93 KB)
??
starlite.py
(11.04 KB)
??
statsig.py
(1.19 KB)
??
stdlib.py
(14.01 KB)
??
strawberry.py
(17.39 KB)
??
sys_exit.py
(2.35 KB)
??
threading.py
(6.88 KB)
??
tornado.py
(10.79 KB)
??
trytond.py
(1.67 KB)
??
typer.py
(1.72 KB)
??
unleash.py
(1.02 KB)
??
unraisablehook.py
(1.65 KB)
??
wsgi.py
(15.03 KB)
Editing: aiomysql.py
# -*- coding: utf-8 -*- from __future__ import annotations from typing import Any, Awaitable, Callable, TypeVar import sentry_sdk from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing_utils import ( add_query_source, has_span_streaming_enabled, record_sql_queries, ) from sentry_sdk.utils import ( capture_internal_exceptions, parse_version, ) try: import aiomysql # type: ignore[import-not-found] from aiomysql.connection import Connection # type: ignore[import-not-found] from aiomysql.cursors import Cursor # type: ignore[import-not-found] except ImportError: raise DidNotEnable("aiomysql not installed.") class AioMySQLIntegration(Integration): identifier = "aiomysql" origin = f"auto.db.{identifier}" _record_params = False def __init__(self, *, record_params: bool = False): AioMySQLIntegration._record_params = record_params @staticmethod def setup_once() -> None: aiomysql_version = parse_version(aiomysql.__version__) _check_minimum_version(AioMySQLIntegration, aiomysql_version) Cursor.execute = _wrap_execute(Cursor.execute) Cursor.executemany = _wrap_executemany(Cursor.executemany) # Patch Connection._connect — this catches ALL connections: # - aiomysql.connect() # - aiomysql.create_pool() (pool.py does `from .connection import connect` # which ultimately calls Connection._connect) # - Reconnects Connection._connect = _wrap_connect(Connection._connect) T = TypeVar("T") def _normalize_query(query: str | bytes | bytearray) -> str: if isinstance(query, (bytes, bytearray)): query = query.decode("utf-8", errors="replace") return " ".join(query.split()) def _wrap_execute(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: """Wrap Cursor.execute to capture SQL queries.""" async def _inner(*args: Any, **kwargs: Any) -> T: if sentry_sdk.get_client().get_integration(AioMySQLIntegration) is None: return await f(*args, **kwargs) cursor = args[0] # Skip if flagged by executemany (avoids double-recording). # Do NOT reset the flag here — it must stay True for the entire # duration of executemany, which may call execute multiple times # in a loop (non-INSERT fallback). Only _wrap_executemany's # finally block should clear it. if getattr(cursor, "_sentry_skip_next_execute", False): return await f(*args, **kwargs) query = args[1] if len(args) > 1 else kwargs.get("query", "") query_str = _normalize_query(query) params = args[2] if len(args) > 2 else kwargs.get("args") conn = _get_connection(cursor) integration = sentry_sdk.get_client().get_integration(AioMySQLIntegration) params_list = params if integration and integration._record_params else None param_style = "pyformat" if params_list else None with record_sql_queries( cursor=None, query=query_str, params_list=params_list, paramstyle=param_style, executemany=False, span_origin=AioMySQLIntegration.origin, ) as span: if conn: _set_db_data(span, conn) res = await f(*args, **kwargs) if isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_query_source(span) if not isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_query_source(span) return res return _inner def _wrap_executemany(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: """Wrap Cursor.executemany to capture SQL queries.""" async def _inner(*args: Any, **kwargs: Any) -> T: if sentry_sdk.get_client().get_integration(AioMySQLIntegration) is None: return await f(*args, **kwargs) cursor = args[0] query = args[1] if len(args) > 1 else kwargs.get("query", "") query_str = _normalize_query(query) seq_of_params = args[2] if len(args) > 2 else kwargs.get("args") conn = _get_connection(cursor) integration = sentry_sdk.get_client().get_integration(AioMySQLIntegration) params_list = ( seq_of_params if integration and integration._record_params else None ) param_style = "pyformat" if params_list else None # Prevent double-recording: _do_execute_many calls self.execute internally cursor._sentry_skip_next_execute = True try: with record_sql_queries( cursor=None, query=query_str, params_list=params_list, paramstyle=param_style, executemany=True, span_origin=AioMySQLIntegration.origin, ) as span: if conn: _set_db_data(span, conn) res = await f(*args, **kwargs) if isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_query_source(span) if not isinstance(span, StreamedSpan): with capture_internal_exceptions(): add_query_source(span) return res finally: cursor._sentry_skip_next_execute = False return _inner def _get_connection(cursor: Any) -> Any: """Get the underlying connection from a cursor.""" return getattr(cursor, "connection", None) def _wrap_connect(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: """Wrap Connection._connect to capture connection spans.""" async def _inner(self: "Connection") -> T: client = sentry_sdk.get_client() if client.get_integration(AioMySQLIntegration) is None: return await f(self) if has_span_streaming_enabled(client.options): breadcrumb_data = _get_connect_data(self, use_streaming_keys=True) span_attributes: dict[str, Any] = { "sentry.op": OP.DB, "sentry.origin": AioMySQLIntegration.origin, } | breadcrumb_data with sentry_sdk.traces.start_span( name="connect", attributes=span_attributes ) as span: with capture_internal_exceptions(): sentry_sdk.add_breadcrumb( message="connect", category="query", data=breadcrumb_data ) res = await f(self) else: connect_data = _get_connect_data(self) with sentry_sdk.start_span( op=OP.DB, name="connect", origin=AioMySQLIntegration.origin, ) as span: _set_db_data(span, self) with capture_internal_exceptions(): sentry_sdk.add_breadcrumb( message="connect", category="query", data=connect_data, ) res = await f(self) return res return _inner def _get_connect_data(conn: Any, *, use_streaming_keys: bool = False) -> dict[str, Any]: if use_streaming_keys: db_system = SPANDATA.DB_SYSTEM_NAME db_name = SPANDATA.DB_NAMESPACE else: db_system = SPANDATA.DB_SYSTEM db_name = SPANDATA.DB_NAME data: dict[str, Any] = { db_system: "mysql", SPANDATA.DB_DRIVER_NAME: "aiomysql", } host = getattr(conn, "host", None) if host is not None: data[SPANDATA.SERVER_ADDRESS] = host port = getattr(conn, "port", None) if port is not None: data[SPANDATA.SERVER_PORT] = port database = getattr(conn, "db", None) if database is not None: data[db_name] = database user = getattr(conn, "user", None) if user is not None: data[SPANDATA.DB_USER] = user return data def _set_db_data(span: Any, conn: Any) -> None: """Set database-related span data from connection object.""" if isinstance(span, StreamedSpan): set_value = span.set_attribute db_system = SPANDATA.DB_SYSTEM_NAME db_name = SPANDATA.DB_NAMESPACE else: # Remove this else block once we've completely migrated to streamed spans # The use of deprecated attributes here is to ensure backwards compatibility set_value = span.set_data db_system = SPANDATA.DB_SYSTEM db_name = SPANDATA.DB_NAME set_value(db_system, "mysql") set_value(SPANDATA.DB_DRIVER_NAME, "aiomysql") host = getattr(conn, "host", None) if host is not None: set_value(SPANDATA.SERVER_ADDRESS, host) port = getattr(conn, "port", None) if port is not None: set_value(SPANDATA.SERVER_PORT, port) database = getattr(conn, "db", None) if database is not None: set_value(db_name, database) user = getattr(conn, "user", None) if user is not None: set_value(SPANDATA.DB_USER, user)
Upload File
Create Folder