X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
??
..
??
__init__.py
(1.46 KB)
??
__pycache__
??
_batcher.py
(5.7 KB)
??
_compat.py
(3 KB)
??
_init_implementation.py
(2.43 KB)
??
_log_batcher.py
(1.88 KB)
??
_lru_cache.py
(1.14 KB)
??
_metrics_batcher.py
(1.21 KB)
??
_queue.py
(10.98 KB)
??
_span_batcher.py
(8.12 KB)
??
_types.py
(13.16 KB)
??
_werkzeug.py
(3.85 KB)
??
ai
??
api.py
(15.59 KB)
??
attachments.py
(2.95 KB)
??
client.py
(49.95 KB)
??
consts.py
(61.95 KB)
??
crons
??
debug.py
(959 B)
??
envelope.py
(9.37 KB)
??
feature_flags.py
(2.5 KB)
??
hub.py
(24.54 KB)
??
integrations
??
logger.py
(2.6 KB)
??
metrics.py
(1.42 KB)
??
monitor.py
(4.47 KB)
??
profiler
??
py.typed
(0 B)
??
scope.py
(74.09 KB)
??
scrubber.py
(5.99 KB)
??
serializer.py
(12.82 KB)
??
session.py
(5.08 KB)
??
sessions.py
(8.59 KB)
??
spotlight.py
(11.85 KB)
??
traces.py
(25.08 KB)
??
tracing.py
(50.33 KB)
??
tracing_utils.py
(54.36 KB)
??
transport.py
(44.41 KB)
??
types.py
(1.24 KB)
??
utils.py
(65.96 KB)
??
worker.py
(10.91 KB)
Editing: worker.py
import asyncio import os import threading from abc import ABC, abstractmethod from time import sleep, time from typing import TYPE_CHECKING from sentry_sdk._queue import FullError, Queue from sentry_sdk.consts import DEFAULT_QUEUE_SIZE from sentry_sdk.utils import logger, mark_sentry_task_internal if TYPE_CHECKING: from typing import Any, Callable, Optional _TERMINATOR = object() class Worker(ABC): """Base class for all workers.""" @property @abstractmethod def is_alive(self) -> bool: """Whether the worker is alive and running.""" pass @abstractmethod def kill(self) -> None: """Kill the worker. It will not process any more events.""" pass def flush( self, timeout: float, callback: "Optional[Callable[[int, float], Any]]" = None ) -> None: """Flush the worker, blocking until done or timeout is reached.""" return None @abstractmethod def full(self) -> bool: """Whether the worker's queue is full.""" pass @abstractmethod def submit(self, callback: "Callable[[], Any]") -> bool: """Schedule a callback. Returns True if queued, False if full.""" pass class BackgroundWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._queue: "Queue" = Queue(queue_size) self._lock = threading.Lock() self._thread: "Optional[threading.Thread]" = None self._thread_for_pid: "Optional[int]" = None @property def is_alive(self) -> bool: if self._thread_for_pid != os.getpid(): return False if not self._thread: return False return self._thread.is_alive() def _ensure_thread(self) -> None: if not self.is_alive: self.start() def _timed_queue_join(self, timeout: float) -> bool: deadline = time() + timeout queue = self._queue queue.all_tasks_done.acquire() try: while queue.unfinished_tasks: delay = deadline - time() if delay <= 0: return False queue.all_tasks_done.wait(timeout=delay) return True finally: queue.all_tasks_done.release() def start(self) -> None: with self._lock: if not self.is_alive: self._thread = threading.Thread( target=self._target, name="sentry-sdk.BackgroundWorker" ) self._thread.daemon = True try: self._thread.start() self._thread_for_pid = os.getpid() except RuntimeError: # At this point we can no longer start because the interpreter # is already shutting down. Sadly at this point we can no longer # send out events. self._thread = None def kill(self) -> None: """ Kill worker thread. Returns immediately. Not useful for waiting on shutdown for events, use `flush` for that. """ logger.debug("background worker got kill request") with self._lock: if self._thread: try: self._queue.put_nowait(_TERMINATOR) except FullError: logger.debug("background worker queue full, kill failed") self._thread = None self._thread_for_pid = None def flush(self, timeout: float, callback: "Optional[Any]" = None) -> None: logger.debug("background worker got flush request") with self._lock: if self.is_alive and timeout > 0.0: self._wait_flush(timeout, callback) logger.debug("background worker flushed") def full(self) -> bool: return self._queue.full() def _wait_flush(self, timeout: float, callback: "Optional[Any]") -> None: initial_timeout = min(0.1, timeout) if not self._timed_queue_join(initial_timeout): pending = self._queue.qsize() + 1 logger.debug("%d event(s) pending on flush", pending) if callback is not None: callback(pending, timeout) if not self._timed_queue_join(timeout - initial_timeout): pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) def submit(self, callback: "Callable[[], Any]") -> bool: self._ensure_thread() try: self._queue.put_nowait(callback) return True except FullError: return False def _target(self) -> None: while True: callback = self._queue.get() try: if callback is _TERMINATOR: break try: callback() except Exception: logger.error("Failed processing job", exc_info=True) finally: self._queue.task_done() sleep(0) class AsyncWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._queue: "Optional[asyncio.Queue[Any]]" = None self._queue_size = queue_size self._task: "Optional[asyncio.Task[None]]" = None # Event loop needs to remain in the same process self._task_for_pid: "Optional[int]" = None self._loop: "Optional[asyncio.AbstractEventLoop]" = None # Track active callback tasks so they have a strong reference and can be cancelled on kill self._active_tasks: "set[asyncio.Task[None]]" = set() @property def is_alive(self) -> bool: if self._task_for_pid != os.getpid(): return False if not self._task or not self._loop: return False return self._loop.is_running() and not self._task.done() def kill(self) -> None: if self._task: # Cancel the main consumer task to prevent duplicate consumers self._task.cancel() # Also cancel any active callback tasks # Avoid modifying the set while cancelling tasks tasks_to_cancel = set(self._active_tasks) for task in tasks_to_cancel: task.cancel() self._active_tasks.clear() self._loop = None self._task = None self._task_for_pid = None def start(self) -> None: if not self.is_alive: try: self._loop = asyncio.get_running_loop() # Always create a fresh queue on start to avoid stale items self._queue = asyncio.Queue(maxsize=self._queue_size) with mark_sentry_task_internal(): self._task = self._loop.create_task(self._target()) self._task_for_pid = os.getpid() except RuntimeError: # There is no event loop running logger.warning("No event loop running, async worker not started") self._loop = None self._task = None self._task_for_pid = None def full(self) -> bool: if self._queue is None: return True return self._queue.full() def _ensure_task(self) -> None: if not self.is_alive: self.start() async def _wait_flush( self, timeout: float, callback: "Optional[Any]" = None ) -> None: if not self._loop or not self._loop.is_running() or self._queue is None: return initial_timeout = min(0.1, timeout) # Timeout on the join try: await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) except asyncio.TimeoutError: pending = self._queue.qsize() + len(self._active_tasks) logger.debug("%d event(s) pending on flush", pending) if callback is not None: callback(pending, timeout) try: remaining_timeout = timeout - initial_timeout await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) except asyncio.TimeoutError: pending = self._queue.qsize() + len(self._active_tasks) logger.error("flush timed out, dropped %s events", pending) def flush( # type: ignore[override] self, timeout: float, callback: "Optional[Any]" = None ) -> "Optional[asyncio.Task[None]]": if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running(): with mark_sentry_task_internal(): return self._loop.create_task(self._wait_flush(timeout, callback)) return None def submit(self, callback: "Callable[[], Any]") -> bool: self._ensure_task() if self._queue is None: return False try: self._queue.put_nowait(callback) return True except asyncio.QueueFull: return False async def _target(self) -> None: if self._queue is None: return try: while True: callback = await self._queue.get() if callback is _TERMINATOR: self._queue.task_done() break # Firing tasks instead of awaiting them allows for concurrent requests with mark_sentry_task_internal(): task = asyncio.create_task(self._process_callback(callback)) # Create a strong reference to the task so it can be cancelled on kill # and does not get garbage collected while running self._active_tasks.add(task) # Capture queue ref at dispatch time so done callbacks use the # correct queue even if kill()/start() replace self._queue. queue_ref = self._queue task.add_done_callback(lambda t: self._on_task_complete(t, queue_ref)) # Yield to let the event loop run other tasks await asyncio.sleep(0) except asyncio.CancelledError: pass # Expected during kill() async def _process_callback(self, callback: "Callable[[], Any]") -> None: # Callback is an async coroutine, need to await it await callback() def _on_task_complete( self, task: "asyncio.Task[None]", queue: "Optional[asyncio.Queue[Any]]" = None, ) -> None: try: task.result() except asyncio.CancelledError: pass # Task was cancelled, expected during shutdown except Exception: logger.error("Failed processing job", exc_info=True) finally: # Mark the task as done and remove it from the active tasks set # Use the queue reference captured at dispatch time, not self._queue, # to avoid calling task_done() on a different queue after kill()/start(). if queue is not None: queue.task_done() self._active_tasks.discard(task)
Upload File
Create Folder