X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
/
??
..
??
__init__.py
(12.51 KB)
??
__pycache__
??
_asgi_common.py
(4 KB)
??
_wsgi_common.py
(7.28 KB)
??
aiohttp.py
(19.28 KB)
??
aiomysql.py
(9.09 KB)
??
anthropic.py
(39 KB)
??
argv.py
(876 B)
??
ariadne.py
(5.7 KB)
??
arq.py
(9.23 KB)
??
asgi.py
(20.06 KB)
??
asyncio.py
(9.28 KB)
??
asyncpg.py
(9.68 KB)
??
atexit.py
(1.51 KB)
??
aws_lambda.py
(17.41 KB)
??
beam.py
(4.91 KB)
??
boto3.py
(6.2 KB)
??
bottle.py
(7.21 KB)
??
celery
??
chalice.py
(4.51 KB)
??
clickhouse_driver.py
(5.85 KB)
??
cloud_resource_context.py
(7.49 KB)
??
cohere.py
(10.44 KB)
??
dedupe.py
(1.86 KB)
??
django
??
dramatiq.py
(8.02 KB)
??
excepthook.py
(2.25 KB)
??
executing.py
(1.93 KB)
??
falcon.py
(9.04 KB)
??
fastapi.py
(5.28 KB)
??
flask.py
(8.27 KB)
??
gcp.py
(10.57 KB)
??
gnu_backtrace.py
(2.72 KB)
??
google_genai
??
gql.py
(4.93 KB)
??
graphene.py
(5.71 KB)
??
grpc
??
httpx.py
(9.79 KB)
??
httpx2.py
(9.8 KB)
??
huey.py
(8.19 KB)
??
huggingface_hub.py
(15.28 KB)
??
langchain.py
(48.31 KB)
??
langgraph.py
(18.13 KB)
??
launchdarkly.py
(1.87 KB)
??
litellm.py
(13.03 KB)
??
litestar.py
(11.46 KB)
??
logging.py
(15.69 KB)
??
loguru.py
(6.35 KB)
??
mcp.py
(23.12 KB)
??
modules.py
(787 B)
??
openai.py
(53.38 KB)
??
openai_agents
??
openfeature.py
(1.08 KB)
??
opentelemetry
??
otlp.py
(7.99 KB)
??
pure_eval.py
(4.41 KB)
??
pydantic_ai
??
pymongo.py
(8.21 KB)
??
pyramid.py
(7.42 KB)
??
pyreqwest.py
(6.82 KB)
??
quart.py
(7.32 KB)
??
ray.py
(5.75 KB)
??
redis
??
rq.py
(7.81 KB)
??
rust_tracing.py
(9.44 KB)
??
sanic.py
(15.25 KB)
??
serverless.py
(1.58 KB)
??
socket.py
(5.02 KB)
??
spark
??
sqlalchemy.py
(5.24 KB)
??
starlette.py
(27.93 KB)
??
starlite.py
(11.04 KB)
??
statsig.py
(1.19 KB)
??
stdlib.py
(14.01 KB)
??
strawberry.py
(17.39 KB)
??
sys_exit.py
(2.35 KB)
??
threading.py
(6.88 KB)
??
tornado.py
(10.79 KB)
??
trytond.py
(1.67 KB)
??
typer.py
(1.72 KB)
??
unleash.py
(1.02 KB)
??
unraisablehook.py
(1.65 KB)
??
wsgi.py
(15.03 KB)
Editing: langgraph.py
from functools import wraps from typing import Any, Callable, List, Optional import sentry_sdk from sentry_sdk.ai.utils import ( get_start_span_function, normalize_message_roles, set_data_normalized, truncate_and_annotate_messages, ) from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing_utils import ( has_span_streaming_enabled, should_truncate_gen_ai_input, ) from sentry_sdk.utils import safe_serialize try: from langgraph.graph import StateGraph from langgraph.pregel import Pregel except ImportError: raise DidNotEnable("langgraph not installed") class LanggraphIntegration(Integration): identifier = "langgraph" origin = f"auto.ai.{identifier}" def __init__(self: "LanggraphIntegration", include_prompts: bool = True) -> None: self.include_prompts = include_prompts @staticmethod def setup_once() -> None: # LangGraph lets users create agents using a StateGraph or the Functional API. # StateGraphs are then compiled to a CompiledStateGraph. Both CompiledStateGraph and # the functional API execute on a Pregel instance. Pregel is the runtime for the graph # and the invocation happens on Pregel, so patching the invoke methods takes care of both. # The streaming methods are not patched, because due to some internal reasons, LangGraph # will automatically patch the streaming methods to run through invoke, and by doing this # we prevent duplicate spans for invocations. StateGraph.compile = _wrap_state_graph_compile(StateGraph.compile) if hasattr(Pregel, "invoke"): Pregel.invoke = _wrap_pregel_invoke(Pregel.invoke) if hasattr(Pregel, "ainvoke"): Pregel.ainvoke = _wrap_pregel_ainvoke(Pregel.ainvoke) def _get_graph_name(graph_obj: "Any") -> "Optional[str]": for attr in ["name", "graph_name", "__name__", "_name"]: if hasattr(graph_obj, attr): name = getattr(graph_obj, attr) if name and isinstance(name, str): return name return None def _normalize_langgraph_message(message: "Any") -> "Any": if not hasattr(message, "content"): return None parsed = {"role": getattr(message, "type", None), "content": message.content} for attr in [ "name", "tool_calls", "function_call", "tool_call_id", "response_metadata", ]: if hasattr(message, attr): value = getattr(message, attr) if value is not None: parsed[attr] = value return parsed def _parse_langgraph_messages(state: "Any") -> "Optional[List[Any]]": if not state: return None messages = None if isinstance(state, dict): messages = state.get("messages") elif hasattr(state, "messages"): messages = state.messages elif hasattr(state, "get") and callable(state.get): try: messages = state.get("messages") except Exception: pass if not messages or not isinstance(messages, (list, tuple)): return None normalized_messages = [] for message in messages: try: normalized = _normalize_langgraph_message(message) if normalized: normalized_messages.append(normalized) except Exception: continue return normalized_messages if normalized_messages else None def _wrap_state_graph_compile(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_compile(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() integration = client.get_integration(LanggraphIntegration) if integration is None or has_span_streaming_enabled(client.options): return f(self, *args, **kwargs) with sentry_sdk.start_span( op=OP.GEN_AI_CREATE_AGENT, origin=LanggraphIntegration.origin, ) as span: compiled_graph = f(self, *args, **kwargs) compiled_graph_name = getattr(compiled_graph, "name", None) span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "create_agent") span.set_data(SPANDATA.GEN_AI_AGENT_NAME, compiled_graph_name) if compiled_graph_name: span.description = f"create_agent {compiled_graph_name}" else: span.description = "create_agent" if kwargs.get("model", None) is not None: span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, kwargs.get("model")) tools = None get_graph = getattr(compiled_graph, "get_graph", None) if get_graph and callable(get_graph): graph_obj = compiled_graph.get_graph() nodes = getattr(graph_obj, "nodes", None) if nodes and isinstance(nodes, dict): tools_node = nodes.get("tools") if tools_node: data = getattr(tools_node, "data", None) if data and hasattr(data, "tools_by_name"): tools = list(data.tools_by_name.keys()) if tools is not None: span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, tools) return compiled_graph return new_compile def _wrap_pregel_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() integration = client.get_integration(LanggraphIntegration) if integration is None: return f(self, *args, **kwargs) graph_name = _get_graph_name(self) span_name = ( f"invoke_agent {graph_name}".strip() if graph_name else "invoke_agent" ) if has_span_streaming_enabled(client.options): with sentry_sdk.traces.start_span( name=span_name, attributes={ "sentry.op": OP.GEN_AI_INVOKE_AGENT, "sentry.origin": LanggraphIntegration.origin, SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", }, ) as span: if graph_name: span.set_attribute(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) span.set_attribute(SPANDATA.GEN_AI_AGENT_NAME, graph_name) # Store input messages to later compare with output input_messages = None if ( len(args) > 0 and should_send_default_pii() and integration.include_prompts ): input_messages = _parse_langgraph_messages(args[0]) if input_messages: normalized_input_messages = normalize_message_roles( input_messages ) client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( truncate_and_annotate_messages( normalized_input_messages, span, scope ) if should_truncate_gen_ai_input(client.options) else normalized_input_messages ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) result = f(self, *args, **kwargs) _set_response_attributes(span, input_messages, result, integration) return result else: with get_start_span_function()( op=OP.GEN_AI_INVOKE_AGENT, name=span_name, origin=LanggraphIntegration.origin, ) as span: if graph_name: span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") # Store input messages to later compare with output input_messages = None if ( len(args) > 0 and should_send_default_pii() and integration.include_prompts ): input_messages = _parse_langgraph_messages(args[0]) if input_messages: normalized_input_messages = normalize_message_roles( input_messages ) client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( truncate_and_annotate_messages( normalized_input_messages, span, scope ) if should_truncate_gen_ai_input(client.options) else normalized_input_messages ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) result = f(self, *args, **kwargs) _set_response_attributes(span, input_messages, result, integration) return result return new_invoke def _wrap_pregel_ainvoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) async def new_ainvoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() integration = client.get_integration(LanggraphIntegration) if integration is None: return await f(self, *args, **kwargs) graph_name = _get_graph_name(self) span_name = ( f"invoke_agent {graph_name}".strip() if graph_name else "invoke_agent" ) if has_span_streaming_enabled(client.options): with sentry_sdk.traces.start_span( name=span_name, attributes={ "sentry.op": OP.GEN_AI_INVOKE_AGENT, "sentry.origin": LanggraphIntegration.origin, SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", }, ) as span: if graph_name: span.set_attribute(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) span.set_attribute(SPANDATA.GEN_AI_AGENT_NAME, graph_name) input_messages = None if ( len(args) > 0 and should_send_default_pii() and integration.include_prompts ): input_messages = _parse_langgraph_messages(args[0]) if input_messages: normalized_input_messages = normalize_message_roles( input_messages ) client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( truncate_and_annotate_messages( normalized_input_messages, span, scope ) if should_truncate_gen_ai_input(client.options) else normalized_input_messages ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) result = await f(self, *args, **kwargs) _set_response_attributes(span, input_messages, result, integration) return result with get_start_span_function()( op=OP.GEN_AI_INVOKE_AGENT, name=span_name, origin=LanggraphIntegration.origin, ) as span: if graph_name: span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") input_messages = None if ( len(args) > 0 and should_send_default_pii() and integration.include_prompts ): input_messages = _parse_langgraph_messages(args[0]) if input_messages: normalized_input_messages = normalize_message_roles(input_messages) client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( truncate_and_annotate_messages( normalized_input_messages, span, scope ) if should_truncate_gen_ai_input(client.options) else normalized_input_messages ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) result = await f(self, *args, **kwargs) _set_response_attributes(span, input_messages, result, integration) return result return new_ainvoke def _get_new_messages( input_messages: "Optional[List[Any]]", output_messages: "Optional[List[Any]]" ) -> "Optional[List[Any]]": """Extract only the new messages added during this invocation.""" if not output_messages: return None if not input_messages: return output_messages # only return the new messages, aka the output messages that are not in the input messages input_count = len(input_messages) new_messages = ( output_messages[input_count:] if len(output_messages) > input_count else [] ) return new_messages if new_messages else None def _extract_llm_response_text(messages: "Optional[List[Any]]") -> "Optional[str]": if not messages: return None for message in reversed(messages): if isinstance(message, dict): role = message.get("role") if role in ["assistant", "ai"]: content = message.get("content") if content and isinstance(content, str): return content return None def _extract_tool_calls(messages: "Optional[List[Any]]") -> "Optional[List[Any]]": if not messages: return None tool_calls = [] for message in messages: if isinstance(message, dict): msg_tool_calls = message.get("tool_calls") if msg_tool_calls and isinstance(msg_tool_calls, list): tool_calls.extend(msg_tool_calls) return tool_calls if tool_calls else None def _set_usage_data(span: "sentry_sdk.tracing.Span", messages: "Any") -> None: input_tokens = 0 output_tokens = 0 total_tokens = 0 for message in messages: response_metadata = message.get("response_metadata") if response_metadata is None: continue token_usage = response_metadata.get("token_usage") if not token_usage: continue input_tokens += int(token_usage.get("prompt_tokens", 0)) output_tokens += int(token_usage.get("completion_tokens", 0)) total_tokens += int(token_usage.get("total_tokens", 0)) set_on_span = ( span.set_attribute if isinstance(span, StreamedSpan) else span.set_data ) if input_tokens > 0: set_on_span(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) if output_tokens > 0: set_on_span(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) if total_tokens > 0: set_on_span( SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens, ) def _set_response_model_name(span: "sentry_sdk.tracing.Span", messages: "Any") -> None: if len(messages) == 0: return last_message = messages[-1] response_metadata = last_message.get("response_metadata") if response_metadata is None: return model_name = response_metadata.get("model_name") if model_name is None: return set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_MODEL, model_name) def _set_response_attributes( span: "Any", input_messages: "Optional[List[Any]]", result: "Any", integration: "LanggraphIntegration", ) -> None: parsed_response_messages = _parse_langgraph_messages(result) new_messages = _get_new_messages(input_messages, parsed_response_messages) if new_messages is None: return _set_usage_data(span, new_messages) _set_response_model_name(span, new_messages) if not (should_send_default_pii() and integration.include_prompts): return llm_response_text = _extract_llm_response_text(new_messages) if llm_response_text: set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, llm_response_text) elif new_messages: set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, new_messages) else: set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, result) tool_calls = _extract_tool_calls(new_messages) if tool_calls: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, safe_serialize(tool_calls), unpack=False, )
Upload File
Create Folder