X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
/
??
..
??
__init__.py
(12.51 KB)
??
__pycache__
??
_asgi_common.py
(4 KB)
??
_wsgi_common.py
(7.28 KB)
??
aiohttp.py
(19.28 KB)
??
aiomysql.py
(9.09 KB)
??
anthropic.py
(39 KB)
??
argv.py
(876 B)
??
ariadne.py
(5.7 KB)
??
arq.py
(9.23 KB)
??
asgi.py
(20.06 KB)
??
asyncio.py
(9.28 KB)
??
asyncpg.py
(9.68 KB)
??
atexit.py
(1.51 KB)
??
aws_lambda.py
(17.41 KB)
??
beam.py
(4.91 KB)
??
boto3.py
(6.2 KB)
??
bottle.py
(7.21 KB)
??
celery
??
chalice.py
(4.51 KB)
??
clickhouse_driver.py
(5.85 KB)
??
cloud_resource_context.py
(7.49 KB)
??
cohere.py
(10.44 KB)
??
dedupe.py
(1.86 KB)
??
django
??
dramatiq.py
(8.02 KB)
??
excepthook.py
(2.25 KB)
??
executing.py
(1.93 KB)
??
falcon.py
(9.04 KB)
??
fastapi.py
(5.28 KB)
??
flask.py
(8.27 KB)
??
gcp.py
(10.57 KB)
??
gnu_backtrace.py
(2.72 KB)
??
google_genai
??
gql.py
(4.93 KB)
??
graphene.py
(5.71 KB)
??
grpc
??
httpx.py
(9.79 KB)
??
httpx2.py
(9.8 KB)
??
huey.py
(8.19 KB)
??
huggingface_hub.py
(15.28 KB)
??
langchain.py
(48.31 KB)
??
langgraph.py
(18.13 KB)
??
launchdarkly.py
(1.87 KB)
??
litellm.py
(13.03 KB)
??
litestar.py
(11.46 KB)
??
logging.py
(15.69 KB)
??
loguru.py
(6.35 KB)
??
mcp.py
(23.12 KB)
??
modules.py
(787 B)
??
openai.py
(53.38 KB)
??
openai_agents
??
openfeature.py
(1.08 KB)
??
opentelemetry
??
otlp.py
(7.99 KB)
??
pure_eval.py
(4.41 KB)
??
pydantic_ai
??
pymongo.py
(8.21 KB)
??
pyramid.py
(7.42 KB)
??
pyreqwest.py
(6.82 KB)
??
quart.py
(7.32 KB)
??
ray.py
(5.75 KB)
??
redis
??
rq.py
(7.81 KB)
??
rust_tracing.py
(9.44 KB)
??
sanic.py
(15.25 KB)
??
serverless.py
(1.58 KB)
??
socket.py
(5.02 KB)
??
spark
??
sqlalchemy.py
(5.24 KB)
??
starlette.py
(27.93 KB)
??
starlite.py
(11.04 KB)
??
statsig.py
(1.19 KB)
??
stdlib.py
(14.01 KB)
??
strawberry.py
(17.39 KB)
??
sys_exit.py
(2.35 KB)
??
threading.py
(6.88 KB)
??
tornado.py
(10.79 KB)
??
trytond.py
(1.67 KB)
??
typer.py
(1.72 KB)
??
unleash.py
(1.02 KB)
??
unraisablehook.py
(1.65 KB)
??
wsgi.py
(15.03 KB)
Editing: langchain.py
import itertools import json import sys import warnings from collections import OrderedDict from functools import wraps from typing import TYPE_CHECKING import sentry_sdk from sentry_sdk.ai.utils import ( GEN_AI_ALLOWED_MESSAGE_ROLES, get_start_span_function, normalize_message_roles, set_data_normalized, transform_content_part, truncate_and_annotate_messages, ) from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii from sentry_sdk.traces import StreamedSpan from sentry_sdk.tracing_utils import ( _get_value, has_span_streaming_enabled, should_truncate_gen_ai_input, ) from sentry_sdk.utils import capture_internal_exceptions, logger if TYPE_CHECKING: from typing import ( Any, AsyncIterator, Callable, Dict, Iterator, List, Optional, Union, ) from uuid import UUID from sentry_sdk._types import TextPart from sentry_sdk.tracing import Span try: from langchain_core.agents import AgentFinish from langchain_core.callbacks import ( BaseCallbackHandler, BaseCallbackManager, Callbacks, manager, ) from langchain_core.messages import BaseMessage from langchain_core.outputs import LLMResult except ImportError: raise DidNotEnable("langchain not installed") try: # >=v1 from langchain_classic.agents import AgentExecutor # type: ignore[import-not-found] except ImportError: try: # <v1 from langchain.agents import AgentExecutor # Catch TypeError due to changes in type hint evaluation order: https://github.com/pydantic/pydantic/issues/13036 except (ImportError, TypeError): AgentExecutor = None # Conditional imports for embeddings providers try: from langchain_openai import OpenAIEmbeddings # type: ignore[import-not-found] except ImportError: OpenAIEmbeddings = None try: from langchain_openai import AzureOpenAIEmbeddings except ImportError: AzureOpenAIEmbeddings = None try: from langchain_google_vertexai import ( # type: ignore[import-not-found] VertexAIEmbeddings, ) except ImportError: VertexAIEmbeddings = None try: from langchain_aws import BedrockEmbeddings # type: ignore[import-not-found] except ImportError: BedrockEmbeddings = None try: from langchain_cohere import CohereEmbeddings # type: ignore[import-not-found] except ImportError: CohereEmbeddings = None try: from langchain_mistralai import ( # type: ignore[import-not-found] MistralAIEmbeddings, ) except ImportError: MistralAIEmbeddings = None try: from langchain_huggingface import ( # type: ignore[import-not-found] HuggingFaceEmbeddings, ) except ImportError: HuggingFaceEmbeddings = None try: from langchain_ollama import OllamaEmbeddings # type: ignore[import-not-found] except ImportError: OllamaEmbeddings = None def _get_ai_system(all_params: "Dict[str, Any]") -> "Optional[str]": ai_type = all_params.get("_type") if not ai_type or not isinstance(ai_type, str): return None return ai_type DATA_FIELDS = { "frequency_penalty": SPANDATA.GEN_AI_REQUEST_FREQUENCY_PENALTY, "function_call": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, "max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, "presence_penalty": SPANDATA.GEN_AI_REQUEST_PRESENCE_PENALTY, "temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE, "tool_calls": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, "top_k": SPANDATA.GEN_AI_REQUEST_TOP_K, "top_p": SPANDATA.GEN_AI_REQUEST_TOP_P, } def _transform_langchain_content_block( content_block: "Dict[str, Any]", ) -> "Dict[str, Any]": """ Transform a LangChain content block using the shared transform_content_part function. Returns the original content block if transformation is not applicable (e.g., for text blocks or unrecognized formats). """ result = transform_content_part(content_block) return result if result is not None else content_block def _transform_langchain_message_content(content: "Any") -> "Any": """ Transform LangChain message content, handling both string content and list of content blocks. """ if isinstance(content, str): return content if isinstance(content, (list, tuple)): transformed = [] for block in content: if isinstance(block, dict): transformed.append(_transform_langchain_content_block(block)) else: transformed.append(block) return transformed return content def _get_system_instructions(messages: "List[List[BaseMessage]]") -> "List[str]": system_instructions = [] for list_ in messages: for message in list_: # type of content: str | list[str | dict] | None if message.type == "system" and isinstance(message.content, str): system_instructions.append(message.content) elif message.type == "system" and isinstance(message.content, list): for item in message.content: if isinstance(item, str): system_instructions.append(item) elif isinstance(item, dict) and item.get("type") == "text": instruction = item.get("text") if isinstance(instruction, str): system_instructions.append(instruction) return system_instructions def _transform_system_instructions( system_instructions: "List[str]", ) -> "List[TextPart]": return [ { "type": "text", "content": instruction, } for instruction in system_instructions ] class LangchainIntegration(Integration): identifier = "langchain" origin = f"auto.ai.{identifier}" def __init__( self: "LangchainIntegration", include_prompts: bool = True, max_spans: "Optional[int]" = None, ) -> None: self.include_prompts = include_prompts self.max_spans = max_spans if max_spans is not None: warnings.warn( "The `max_spans` parameter of `LangchainIntegration` is " "deprecated and will be removed in version 3.0 of sentry-sdk.", DeprecationWarning, stacklevel=2, ) @staticmethod def setup_once() -> None: manager._configure = _wrap_configure(manager._configure) if AgentExecutor is not None: AgentExecutor.invoke = _wrap_agent_executor_invoke(AgentExecutor.invoke) AgentExecutor.stream = _wrap_agent_executor_stream(AgentExecutor.stream) # Patch embeddings providers _patch_embeddings_provider(OpenAIEmbeddings) _patch_embeddings_provider(AzureOpenAIEmbeddings) _patch_embeddings_provider(VertexAIEmbeddings) _patch_embeddings_provider(BedrockEmbeddings) _patch_embeddings_provider(CohereEmbeddings) _patch_embeddings_provider(MistralAIEmbeddings) _patch_embeddings_provider(HuggingFaceEmbeddings) _patch_embeddings_provider(OllamaEmbeddings) class SentryLangchainCallback(BaseCallbackHandler): # type: ignore[misc] """Callback handler that creates Sentry spans.""" def __init__( self, max_span_map_size: "Optional[int]", include_prompts: bool ) -> None: self.span_map: "OrderedDict[UUID, Union[sentry_sdk.tracing.Span, StreamedSpan]]" = OrderedDict() self.max_span_map_size = max_span_map_size self.include_prompts = include_prompts def gc_span_map(self) -> None: if self.max_span_map_size is not None: while len(self.span_map) > self.max_span_map_size: run_id, span = self.span_map.popitem(last=False) self._exit_span(span, run_id) def _handle_error(self, run_id: "UUID", error: "Any") -> None: with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span = self.span_map[run_id] sentry_sdk.capture_exception( error, span._scope if isinstance(span, StreamedSpan) else span.scope ) span.__exit__(type(error), error, error.__traceback__) del self.span_map[run_id] def _normalize_langchain_message(self, message: "BaseMessage") -> "Any": # Transform content to handle multimodal data (images, audio, video, files) transformed_content = _transform_langchain_message_content(message.content) parsed = {"role": message.type, "content": transformed_content} parsed.update(message.additional_kwargs) return parsed def _create_span( self: "SentryLangchainCallback", run_id: "UUID", parent_id: "Optional[Any]", op: str, name: str, origin: str, ) -> "Union[sentry_sdk.tracing.Span, StreamedSpan]": span = None if parent_id: parent_span: "Optional[Union[sentry_sdk.tracing.Span, StreamedSpan]]" = ( self.span_map.get(parent_id) ) if parent_span: span = ( sentry_sdk.traces.start_span( parent_span=parent_span, name=name, attributes={ "sentry.op": op, "sentry.origin": origin, }, ) if isinstance(parent_span, StreamedSpan) else parent_span.start_child(op=op, name=name, origin=origin) ) if span is None: span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) span = ( sentry_sdk.traces.start_span( name=name, attributes={ "sentry.op": op, "sentry.origin": origin, }, ) if span_streaming else sentry_sdk.start_span(op=op, name=name, origin=origin) ) span.__enter__() self.span_map[run_id] = span self.gc_span_map() return span def _exit_span( self: "SentryLangchainCallback", span: "Union[sentry_sdk.tracing.Span, StreamedSpan]", run_id: "UUID", ) -> None: span.__exit__(None, None, None) del self.span_map[run_id] def on_llm_start( self: "SentryLangchainCallback", serialized: "Dict[str, Any]", prompts: "List[str]", *, run_id: "UUID", tags: "Optional[List[str]]" = None, parent_run_id: "Optional[UUID]" = None, metadata: "Optional[Dict[str, Any]]" = None, **kwargs: "Any", ) -> "Any": with capture_internal_exceptions(): if not run_id: return all_params = kwargs.get("invocation_params", {}) all_params.update(serialized.get("kwargs", {})) model = ( all_params.get("model") or all_params.get("model_name") or all_params.get("model_id") or "" ) span = self._create_span( run_id, parent_run_id, op=OP.GEN_AI_TEXT_COMPLETION, name=f"text_completion {model}".strip(), origin=LangchainIntegration.origin, ) set_on_span = ( span.set_attribute if isinstance(span, StreamedSpan) else span.set_data ) set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "text_completion") run_name = kwargs.get("name") if run_name: set_on_span(SPANDATA.GEN_AI_FUNCTION_ID, run_name) if model: set_on_span( SPANDATA.GEN_AI_REQUEST_MODEL, model, ) ai_system = _get_ai_system(all_params) if ai_system: set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system) for key, attribute in DATA_FIELDS.items(): if key in all_params and all_params[key] is not None: set_data_normalized(span, attribute, all_params[key], unpack=False) _set_tools_on_span(span, all_params.get("tools")) if should_send_default_pii() and self.include_prompts: normalized_messages = [ { "role": GEN_AI_ALLOWED_MESSAGE_ROLES.USER, "content": {"type": "text", "text": prompt}, } for prompt in prompts ] client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( truncate_and_annotate_messages(normalized_messages, span, scope) if should_truncate_gen_ai_input(client.options) else normalized_messages ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) def on_chat_model_start( self: "SentryLangchainCallback", serialized: "Dict[str, Any]", messages: "List[List[BaseMessage]]", *, run_id: "UUID", **kwargs: "Any", ) -> "Any": """Run when Chat Model starts running.""" with capture_internal_exceptions(): if not run_id: return all_params = kwargs.get("invocation_params", {}) all_params.update(serialized.get("kwargs", {})) model = ( all_params.get("model") or all_params.get("model_name") or all_params.get("model_id") or "" ) span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.GEN_AI_CHAT, name=f"chat {model}".strip(), origin=LangchainIntegration.origin, ) set_on_span = ( span.set_attribute if isinstance(span, StreamedSpan) else span.set_data ) set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "chat") if model: set_on_span(SPANDATA.GEN_AI_REQUEST_MODEL, model) ai_system = _get_ai_system(all_params) if ai_system: set_on_span(SPANDATA.GEN_AI_SYSTEM, ai_system) agent_metadata = kwargs.get("metadata") if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata: set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"]) run_name = kwargs.get("name") if run_name: set_on_span( SPANDATA.GEN_AI_FUNCTION_ID, run_name, ) for key, attribute in DATA_FIELDS.items(): if key in all_params and all_params[key] is not None: set_data_normalized(span, attribute, all_params[key], unpack=False) _set_tools_on_span(span, all_params.get("tools")) if should_send_default_pii() and self.include_prompts: system_instructions = _get_system_instructions(messages) if len(system_instructions) > 0: set_on_span( SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS, json.dumps(_transform_system_instructions(system_instructions)), ) normalized_messages = [] for list_ in messages: for message in list_: if message.type == "system": continue normalized_messages.append( self._normalize_langchain_message(message) ) normalized_messages = normalize_message_roles(normalized_messages) client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( truncate_and_annotate_messages(normalized_messages, span, scope) if should_truncate_gen_ai_input(client.options) else normalized_messages ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) def on_chat_model_end( self: "SentryLangchainCallback", response: "LLMResult", *, run_id: "UUID", **kwargs: "Any", ) -> "Any": """Run when Chat Model ends running.""" with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span = self.span_map[run_id] if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TEXT, [[x.text for x in list_] for list_ in response.generations], ) _record_token_usage(span, response) self._exit_span(span, run_id) def on_llm_end( self: "SentryLangchainCallback", response: "LLMResult", *, run_id: "UUID", **kwargs: "Any", ) -> "Any": """Run when LLM ends running.""" with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span = self.span_map[run_id] try: generation = response.generations[0][0] except IndexError: generation = None if generation is not None: set_on_span = ( span.set_attribute if isinstance(span, StreamedSpan) else span.set_data ) try: response_model = generation.message.response_metadata.get( "model_name" ) if response_model is not None: set_on_span(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) except AttributeError: pass try: finish_reason = generation.generation_info.get("finish_reason") if finish_reason is not None: set_on_span( SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS, [finish_reason], ) except AttributeError: pass try: if should_send_default_pii() and self.include_prompts: tool_calls = getattr(generation.message, "tool_calls", None) if tool_calls is not None and tool_calls != []: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, tool_calls, unpack=False, ) except AttributeError: pass if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TEXT, [[x.text for x in list_] for list_ in response.generations], ) _record_token_usage(span, response) self._exit_span(span, run_id) def on_llm_error( self: "SentryLangchainCallback", error: "Union[Exception, KeyboardInterrupt]", *, run_id: "UUID", **kwargs: "Any", ) -> "Any": """Run when LLM errors.""" self._handle_error(run_id, error) def on_chat_model_error( self: "SentryLangchainCallback", error: "Union[Exception, KeyboardInterrupt]", *, run_id: "UUID", **kwargs: "Any", ) -> "Any": """Run when Chat Model errors.""" self._handle_error(run_id, error) def on_agent_finish( self: "SentryLangchainCallback", finish: "AgentFinish", *, run_id: "UUID", **kwargs: "Any", ) -> "Any": with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span = self.span_map[run_id] if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TEXT, finish.return_values.items() ) self._exit_span(span, run_id) def on_tool_start( self: "SentryLangchainCallback", serialized: "Dict[str, Any]", input_str: str, *, run_id: "UUID", **kwargs: "Any", ) -> "Any": """Run when tool starts running.""" with capture_internal_exceptions(): if not run_id: return tool_name = serialized.get("name") or kwargs.get("name") or "" span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.GEN_AI_EXECUTE_TOOL, name=f"execute_tool {tool_name}".strip(), origin=LangchainIntegration.origin, ) set_on_span = ( span.set_attribute if isinstance(span, StreamedSpan) else span.set_data ) set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool") set_on_span(SPANDATA.GEN_AI_TOOL_NAME, tool_name) tool_description = serialized.get("description") if tool_description is not None: set_on_span(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description) agent_metadata = kwargs.get("metadata") if isinstance(agent_metadata, dict) and "lc_agent_name" in agent_metadata: set_on_span(SPANDATA.GEN_AI_AGENT_NAME, agent_metadata["lc_agent_name"]) run_name = kwargs.get("name") if run_name: set_on_span( SPANDATA.GEN_AI_FUNCTION_ID, run_name, ) if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.GEN_AI_TOOL_INPUT, kwargs.get("inputs", [input_str]), ) def on_tool_end( self: "SentryLangchainCallback", output: str, *, run_id: "UUID", **kwargs: "Any" ) -> "Any": """Run when tool ends running.""" with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span = self.span_map[run_id] if should_send_default_pii() and self.include_prompts: set_data_normalized(span, SPANDATA.GEN_AI_TOOL_OUTPUT, output) self._exit_span(span, run_id) def on_tool_error( self, error: "SentryLangchainCallback", *args: "Union[Exception, KeyboardInterrupt]", run_id: "UUID", **kwargs: "Any", ) -> "Any": """Run when tool errors.""" self._handle_error(run_id, error) def _extract_tokens( token_usage: "Any", ) -> "tuple[Optional[int], Optional[int], Optional[int]]": if not token_usage: return None, None, None input_tokens = _get_value(token_usage, "prompt_tokens") or _get_value( token_usage, "input_tokens" ) output_tokens = _get_value(token_usage, "completion_tokens") or _get_value( token_usage, "output_tokens" ) total_tokens = _get_value(token_usage, "total_tokens") return input_tokens, output_tokens, total_tokens def _extract_tokens_from_generations( generations: "Any", ) -> "tuple[Optional[int], Optional[int], Optional[int]]": """Extract token usage from response.generations structure.""" if not generations: return None, None, None total_input = 0 total_output = 0 total_total = 0 for gen_list in generations: for gen in gen_list: token_usage = _get_token_usage(gen) input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage) total_input += input_tokens if input_tokens is not None else 0 total_output += output_tokens if output_tokens is not None else 0 total_total += total_tokens if total_tokens is not None else 0 return ( total_input if total_input > 0 else None, total_output if total_output > 0 else None, total_total if total_total > 0 else None, ) def _get_token_usage(obj: "Any") -> "Optional[Dict[str, Any]]": """ Check multiple paths to extract token usage from different objects. """ possible_names = ("usage", "token_usage", "usage_metadata") message = _get_value(obj, "message") if message is not None: for name in possible_names: usage = _get_value(message, name) if usage is not None: return usage llm_output = _get_value(obj, "llm_output") if llm_output is not None: for name in possible_names: usage = _get_value(llm_output, name) if usage is not None: return usage for name in possible_names: usage = _get_value(obj, name) if usage is not None: return usage return None def _record_token_usage(span: "Union[Span, StreamedSpan]", response: "Any") -> None: token_usage = _get_token_usage(response) if token_usage: input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage) else: input_tokens, output_tokens, total_tokens = _extract_tokens_from_generations( response.generations ) set_on_span = ( span.set_attribute if isinstance(span, StreamedSpan) else span.set_data ) if input_tokens is not None: set_on_span(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) if output_tokens is not None: set_on_span(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) if total_tokens is not None: set_on_span(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens) def _get_request_data( obj: "Any", args: "Any", kwargs: "Any" ) -> "tuple[Optional[str], Optional[List[Any]]]": """ Get the agent name and available tools for the agent. """ agent = getattr(obj, "agent", None) runnable = getattr(agent, "runnable", None) runnable_config = getattr(runnable, "config", {}) tools = ( getattr(obj, "tools", None) or getattr(agent, "tools", None) or runnable_config.get("tools") or runnable_config.get("available_tools") ) tools = tools if tools and len(tools) > 0 else None try: agent_name = None if len(args) > 1: agent_name = args[1].get("run_name") if agent_name is None: agent_name = runnable_config.get("run_name") except Exception: pass return (agent_name, tools) def _simplify_langchain_tools(tools: "Any") -> "Optional[List[Any]]": """Parse and simplify tools into a cleaner format.""" if not tools: return None if not isinstance(tools, (list, tuple)): return None simplified_tools = [] for tool in tools: try: if isinstance(tool, dict): if "function" in tool and isinstance(tool["function"], dict): func = tool["function"] simplified_tool = { "name": func.get("name"), "description": func.get("description"), } if simplified_tool["name"]: simplified_tools.append(simplified_tool) elif "name" in tool: simplified_tool = { "name": tool.get("name"), "description": tool.get("description"), } simplified_tools.append(simplified_tool) else: name = ( tool.get("name") or tool.get("tool_name") or tool.get("function_name") ) if name: simplified_tools.append( { "name": name, "description": tool.get("description") or tool.get("desc"), } ) elif hasattr(tool, "name"): simplified_tool = { "name": getattr(tool, "name", None), "description": getattr(tool, "description", None) or getattr(tool, "desc", None), } if simplified_tool["name"]: simplified_tools.append(simplified_tool) elif hasattr(tool, "__name__"): simplified_tools.append( { "name": tool.__name__, "description": getattr(tool, "__doc__", None), } ) else: tool_str = str(tool) if tool_str and tool_str != "": simplified_tools.append({"name": tool_str, "description": None}) except Exception: continue return simplified_tools if simplified_tools else None def _set_tools_on_span(span: "Union[Span, StreamedSpan]", tools: "Any") -> None: """Set available tools data on a span if tools are provided.""" if tools is not None: simplified_tools = _simplify_langchain_tools(tools) if simplified_tools: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, simplified_tools, unpack=False, ) def _wrap_configure(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_configure( callback_manager_cls: type, inheritable_callbacks: "Callbacks" = None, local_callbacks: "Callbacks" = None, *args: "Any", **kwargs: "Any", ) -> "Any": integration = sentry_sdk.get_client().get_integration(LangchainIntegration) if integration is None: return f( callback_manager_cls, inheritable_callbacks, local_callbacks, *args, **kwargs, ) local_callbacks = local_callbacks or [] # Handle each possible type of local_callbacks. For each type, we # extract the list of callbacks to check for SentryLangchainCallback, # and define a function that would add the SentryLangchainCallback # to the existing callbacks list. if isinstance(local_callbacks, BaseCallbackManager): callbacks_list = local_callbacks.handlers elif isinstance(local_callbacks, BaseCallbackHandler): callbacks_list = [local_callbacks] elif isinstance(local_callbacks, list): callbacks_list = local_callbacks else: logger.debug("Unknown callback type: %s", local_callbacks) # Just proceed with original function call return f( callback_manager_cls, inheritable_callbacks, local_callbacks, *args, **kwargs, ) # Handle each possible type of inheritable_callbacks. if isinstance(inheritable_callbacks, BaseCallbackManager): inheritable_callbacks_list = inheritable_callbacks.handlers elif isinstance(inheritable_callbacks, list): inheritable_callbacks_list = inheritable_callbacks else: inheritable_callbacks_list = [] if not any( isinstance(cb, SentryLangchainCallback) for cb in itertools.chain(callbacks_list, inheritable_callbacks_list) ): sentry_handler = SentryLangchainCallback( integration.max_spans, integration.include_prompts, ) if isinstance(local_callbacks, BaseCallbackManager): local_callbacks = local_callbacks.copy() local_callbacks.handlers = [ *local_callbacks.handlers, sentry_handler, ] elif isinstance(local_callbacks, BaseCallbackHandler): local_callbacks = [local_callbacks, sentry_handler] else: local_callbacks = [*local_callbacks, sentry_handler] return f( callback_manager_cls, inheritable_callbacks, local_callbacks, *args, **kwargs, ) return new_configure def _wrap_agent_executor_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() integration = client.get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) run_name, tools = _get_request_data(self, args, kwargs) if has_span_streaming_enabled(client.options): with sentry_sdk.traces.start_span( name=f"invoke_agent {run_name}" if run_name else "invoke_agent", attributes={ "sentry.op": OP.GEN_AI_INVOKE_AGENT, "sentry.origin": LangchainIntegration.origin, SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", SPANDATA.GEN_AI_RESPONSE_STREAMING: False, }, ) as span: if run_name: span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name) _set_tools_on_span(span, tools) # Run the agent result = f(self, *args, **kwargs) input = result.get("input") if ( input is not None and should_send_default_pii() and integration.include_prompts ): normalized_messages = normalize_message_roles([input]) client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( truncate_and_annotate_messages(normalized_messages, span, scope) if should_truncate_gen_ai_input(client.options) else normalized_messages ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) output = result.get("output") if ( output is not None and should_send_default_pii() and integration.include_prompts ): set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) return result else: start_span_function = get_start_span_function() with start_span_function( op=OP.GEN_AI_INVOKE_AGENT, name=f"invoke_agent {run_name}" if run_name else "invoke_agent", origin=LangchainIntegration.origin, ) as span: if run_name: span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) _set_tools_on_span(span, tools) # Run the agent result = f(self, *args, **kwargs) input = result.get("input") if ( input is not None and should_send_default_pii() and integration.include_prompts ): normalized_messages = normalize_message_roles([input]) client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( truncate_and_annotate_messages(normalized_messages, span, scope) if should_truncate_gen_ai_input(client.options) else normalized_messages ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) output = result.get("output") if ( output is not None and should_send_default_pii() and integration.include_prompts ): set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) return result return new_invoke def _wrap_agent_executor_stream(f: "Callable[..., Any]") -> "Callable[..., Any]": @wraps(f) def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() integration = client.get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) run_name, tools = _get_request_data(self, args, kwargs) if has_span_streaming_enabled(client.options): span = sentry_sdk.traces.start_span( name=f"invoke_agent {run_name}" if run_name else "invoke_agent", attributes={ "sentry.op": OP.GEN_AI_INVOKE_AGENT, "sentry.origin": LangchainIntegration.origin, SPANDATA.GEN_AI_OPERATION_NAME: "invoke_agent", SPANDATA.GEN_AI_RESPONSE_STREAMING: True, }, ) if run_name: span.set_attribute(SPANDATA.GEN_AI_FUNCTION_ID, run_name) else: start_span_function = get_start_span_function() span = start_span_function( op=OP.GEN_AI_INVOKE_AGENT, name=f"invoke_agent {run_name}" if run_name else "invoke_agent", origin=LangchainIntegration.origin, ) span.__enter__() span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) if run_name: span.set_data(SPANDATA.GEN_AI_FUNCTION_ID, run_name) _set_tools_on_span(span, tools) input = args[0].get("input") if len(args) >= 1 else None if ( input is not None and should_send_default_pii() and integration.include_prompts ): normalized_messages = normalize_message_roles([input]) client = sentry_sdk.get_client() scope = sentry_sdk.get_current_scope() messages_data = ( truncate_and_annotate_messages(normalized_messages, span, scope) if should_truncate_gen_ai_input(client.options) else normalized_messages ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) # Run the agent result = f(self, *args, **kwargs) old_iterator = result def new_iterator() -> "Iterator[Any]": exc_info: "tuple[Any, Any, Any]" = (None, None, None) try: for event in old_iterator: yield event try: output = event.get("output") except Exception: output = None if ( output is not None and should_send_default_pii() and integration.include_prompts ): set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) span.__exit__(None, None, None) except Exception: exc_info = sys.exc_info() with capture_internal_exceptions(): span.__exit__(*exc_info) raise async def new_iterator_async() -> "AsyncIterator[Any]": exc_info: "tuple[Any, Any, Any]" = (None, None, None) try: async for event in old_iterator: yield event try: output = event.get("output") except Exception: output = None if ( output is not None and should_send_default_pii() and integration.include_prompts ): set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) span.__exit__(None, None, None) except Exception: exc_info = sys.exc_info() with capture_internal_exceptions(): span.__exit__(*exc_info) raise if str(type(result)) == "<class 'async_generator'>": result = new_iterator_async() else: result = new_iterator() return result return new_stream def _patch_embeddings_provider(provider_class: "Any") -> None: """Patch an embeddings provider class with monitoring wrappers.""" if provider_class is None: return if hasattr(provider_class, "embed_documents"): provider_class.embed_documents = _wrap_embedding_method( provider_class.embed_documents ) if hasattr(provider_class, "embed_query"): provider_class.embed_query = _wrap_embedding_method(provider_class.embed_query) if hasattr(provider_class, "aembed_documents"): provider_class.aembed_documents = _wrap_async_embedding_method( provider_class.aembed_documents ) if hasattr(provider_class, "aembed_query"): provider_class.aembed_query = _wrap_async_embedding_method( provider_class.aembed_query ) def _wrap_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]": """Wrap sync embedding methods (embed_documents and embed_query).""" @wraps(f) def new_embedding_method(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": client = sentry_sdk.get_client() integration = client.get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) model_name = getattr(self, "model", None) or getattr(self, "model_name", None) if has_span_streaming_enabled(client.options): with sentry_sdk.traces.start_span( name=f"embeddings {model_name}" if model_name else "embeddings", attributes={ "sentry.op": OP.GEN_AI_EMBEDDINGS, "sentry.origin": LangchainIntegration.origin, SPANDATA.GEN_AI_OPERATION_NAME: "embeddings", }, ) as span: if model_name: span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) # Capture input if PII is allowed if ( should_send_default_pii() and integration.include_prompts and len(args) > 0 ): input_data = args[0] # Normalize to list format texts = input_data if isinstance(input_data, list) else [input_data] set_data_normalized( span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False ) result = f(self, *args, **kwargs) return result else: with sentry_sdk.start_span( op=OP.GEN_AI_EMBEDDINGS, name=f"embeddings {model_name}" if model_name else "embeddings", origin=LangchainIntegration.origin, ) as span: span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") if model_name: span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) # Capture input if PII is allowed if ( should_send_default_pii() and integration.include_prompts and len(args) > 0 ): input_data = args[0] # Normalize to list format texts = input_data if isinstance(input_data, list) else [input_data] set_data_normalized( span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False ) result = f(self, *args, **kwargs) return result return new_embedding_method def _wrap_async_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]": """Wrap async embedding methods (aembed_documents and aembed_query).""" @wraps(f) async def new_async_embedding_method( self: "Any", *args: "Any", **kwargs: "Any" ) -> "Any": client = sentry_sdk.get_client() integration = client.get_integration(LangchainIntegration) if integration is None: return await f(self, *args, **kwargs) model_name = getattr(self, "model", None) or getattr(self, "model_name", None) if has_span_streaming_enabled(client.options): with sentry_sdk.traces.start_span( name=f"embeddings {model_name}" if model_name else "embeddings", attributes={ "sentry.op": OP.GEN_AI_EMBEDDINGS, "sentry.origin": LangchainIntegration.origin, SPANDATA.GEN_AI_OPERATION_NAME: "embeddings", }, ) as span: if model_name: span.set_attribute(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) # Capture input if PII is allowed if ( should_send_default_pii() and integration.include_prompts and len(args) > 0 ): input_data = args[0] # Normalize to list format texts = input_data if isinstance(input_data, list) else [input_data] set_data_normalized( span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False ) result = await f(self, *args, **kwargs) return result else: with sentry_sdk.start_span( op=OP.GEN_AI_EMBEDDINGS, name=f"embeddings {model_name}" if model_name else "embeddings", origin=LangchainIntegration.origin, ) as span: span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") if model_name: span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) # Capture input if PII is allowed if ( should_send_default_pii() and integration.include_prompts and len(args) > 0 ): input_data = args[0] # Normalize to list format texts = input_data if isinstance(input_data, list) else [input_data] set_data_normalized( span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False ) result = await f(self, *args, **kwargs) return result return new_async_embedding_method
Upload File
Create Folder