X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
??
..
??
__init__.py
(1.46 KB)
??
__pycache__
??
_batcher.py
(5.7 KB)
??
_compat.py
(3 KB)
??
_init_implementation.py
(2.43 KB)
??
_log_batcher.py
(1.88 KB)
??
_lru_cache.py
(1.14 KB)
??
_metrics_batcher.py
(1.21 KB)
??
_queue.py
(10.98 KB)
??
_span_batcher.py
(8.12 KB)
??
_types.py
(13.16 KB)
??
_werkzeug.py
(3.85 KB)
??
ai
??
api.py
(15.59 KB)
??
attachments.py
(2.95 KB)
??
client.py
(49.95 KB)
??
consts.py
(61.95 KB)
??
crons
??
debug.py
(959 B)
??
envelope.py
(9.37 KB)
??
feature_flags.py
(2.5 KB)
??
hub.py
(24.54 KB)
??
integrations
??
logger.py
(2.6 KB)
??
metrics.py
(1.42 KB)
??
monitor.py
(4.47 KB)
??
profiler
??
py.typed
(0 B)
??
scope.py
(74.09 KB)
??
scrubber.py
(5.99 KB)
??
serializer.py
(12.82 KB)
??
session.py
(5.08 KB)
??
sessions.py
(8.59 KB)
??
spotlight.py
(11.85 KB)
??
traces.py
(25.08 KB)
??
tracing.py
(50.33 KB)
??
tracing_utils.py
(54.36 KB)
??
transport.py
(44.41 KB)
??
types.py
(1.24 KB)
??
utils.py
(65.96 KB)
??
worker.py
(10.91 KB)
Editing: _batcher.py
import os import random import threading import weakref from datetime import datetime, timezone from typing import TYPE_CHECKING, Generic, TypeVar from sentry_sdk.envelope import Envelope, Item, PayloadRef from sentry_sdk.utils import format_timestamp if TYPE_CHECKING: from typing import Any, Callable, Optional T = TypeVar("T") class Batcher(Generic[T]): MAX_BEFORE_FLUSH = 100 MAX_BEFORE_DROP = 1_000 FLUSH_WAIT_TIME = 5.0 TYPE = "" CONTENT_TYPE = "" def __init__( self, capture_func: "Callable[[Envelope], None]", record_lost_func: "Callable[..., None]", ) -> None: self._buffer: "list[T]" = [] self._capture_func = capture_func self._record_lost_func = record_lost_func self._running = True self._lock = threading.Lock() self._active: "threading.local" = threading.local() self._flush_event: "threading.Event" = threading.Event() self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None # See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50 if hasattr(os, "register_at_fork"): weak_reset = weakref.WeakMethod(self._reset_thread_state) def _reset_in_child() -> None: method = weak_reset() if method is not None: method() os.register_at_fork(after_in_child=_reset_in_child) def _reset_thread_state(self) -> None: self._buffer = [] self._running = True self._lock = threading.Lock() self._active = threading.local() self._flush_event = threading.Event() self._flusher = None self._flusher_pid = None def _ensure_thread(self) -> bool: """For forking processes we might need to restart this thread. This ensures that our process actually has that thread running. """ if not self._running: return False pid = os.getpid() if self._flusher_pid == pid: return True with self._lock: # Recheck to make sure another thread didn't get here and start the # the flusher in the meantime if self._flusher_pid == pid: return True self._flusher_pid = pid self._flusher = threading.Thread(target=self._flush_loop) self._flusher.daemon = True try: self._flusher.start() except RuntimeError: # Unfortunately at this point the interpreter is in a state that no # longer allows us to spawn a thread and we have to bail. self._running = False return False return True def _flush_loop(self) -> None: # Mark the flush-loop thread as active for its entire lifetime so # that any re-entrant add() triggered by GC warnings during wait(), # flush(), or Event operations is silently dropped instead of # deadlocking on internal locks. self._active.flag = True while self._running: self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random()) self._flush_event.clear() self._flush() def add(self, item: "T") -> None: # Bail out if the current thread is already executing batcher code. # This prevents deadlocks when code running inside the batcher (e.g. # _add_to_envelope during flush, or _flush_event.wait/set) triggers # a GC-emitted warning that routes back through the logging # integration into add(). if getattr(self._active, "flag", False): return None self._active.flag = True try: if not self._ensure_thread() or self._flusher is None: return None with self._lock: if len(self._buffer) >= self.MAX_BEFORE_DROP: self._record_lost(item) return None self._buffer.append(item) if len(self._buffer) >= self.MAX_BEFORE_FLUSH: self._flush_event.set() finally: self._active.flag = False def kill(self) -> None: if self._flusher is None: return self._running = False self._flush_event.set() self._flusher = None def flush(self) -> None: was_active = getattr(self._active, "flag", False) self._active.flag = True try: self._flush() finally: self._active.flag = was_active def _add_to_envelope(self, envelope: "Envelope") -> None: envelope.add_item( Item( type=self.TYPE, content_type=self.CONTENT_TYPE, headers={ "item_count": len(self._buffer), }, payload=PayloadRef( json={ "version": 2, "items": [ self._to_transport_format(item) for item in self._buffer ], } ), ) ) def _flush(self) -> "Optional[Envelope]": envelope = Envelope( headers={"sent_at": format_timestamp(datetime.now(timezone.utc))} ) with self._lock: if len(self._buffer) == 0: return None self._add_to_envelope(envelope) self._buffer.clear() self._capture_func(envelope) return envelope def _record_lost(self, item: "T") -> None: pass @staticmethod def _to_transport_format(item: "T") -> "Any": pass
Upload File
Create Folder