X7ROOT File Manager
Current Path:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/hooks
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
hooks
/
??
..
??
__init__.py
(0 B)
??
__pycache__
??
execute.py
(5.32 KB)
??
native.py
(968 B)
Editing: execute.py
import asyncio import json import os import stat import tempfile from defence360agent.contracts.config import Core from defence360agent.hooks import native as native_hooks from defence360agent.internals.logger import EventHookLogger from defence360agent.model.event_hook import EventHook from defence360agent.model.instance import db from defence360agent.utils import run, snake_case event_hook_logger = EventHookLogger() def get_hooks(event): # if database is not available (i.e. direct RPC call), do not try to # load hooks if db.deferred: return [] hooks = EventHook.select().where(EventHook.event == event) return list(hooks) def _validate_hook_path(path, native=False): """Raise ValueError if path is not a safe hook file. The original check rejected any path under /tmp, /var/tmp, /dev/shm on the grounds that those dirs are world-writable. That blanket- by-prefix rule was too coarse: pytest's tmp_path lives under /tmp/pytest-of-<user>/... and the agent's own integration fixtures legitimately put hook files there. The real threats are (a) an attacker-owned file (DB row points at a path the attacker controls) and (b) a hook whose immediate parent is world-writable so the file can be swapped between this check and the exec. The required permission bit differs between branches: subprocess hooks are exec'd by the kernel (needs X_OK), but native hooks are loaded via importlib's open()+exec_module path which only needs R_OK. A standard Python file in mode 0o644 is loadable but not executable, so requiring +x for native hooks would silently break the typical native-hook deployment (the `hook add-native` RPC has never required or documented an executable bit). """ if not os.path.isfile(path): raise ValueError( "Hook path does not exist or is not a file: {}".format(path) ) if native: if not os.access(path, os.R_OK): raise ValueError("Hook path is not readable: {}".format(path)) else: if not os.access(path, os.X_OK): raise ValueError("Hook path is not executable: {}".format(path)) real = os.path.realpath(path) try: st = os.stat(real) except OSError as exc: raise ValueError("Hook path stat failed: {}: {}".format(path, exc)) # Reject world-writable files: any unprivileged user could rewrite # them between this check and the subprocess/importlib load. if st.st_mode & stat.S_IWOTH: raise ValueError("Hook path is world-writable: {}".format(path)) parent = os.path.dirname(real) if parent and parent != "/": try: pst = os.stat(parent) except OSError as exc: raise ValueError( "Hook parent stat failed: {}: {}".format(parent, exc) ) # A world-writable parent without the sticky bit means an # attacker can replace our hook by deleting+recreating the # file. /tmp itself has the sticky bit so renames are owner- # only, which is safe; pytest's tmp_path subdirs are mode 700. if (pst.st_mode & stat.S_IWOTH) and not (pst.st_mode & stat.S_ISVTX): raise ValueError( "Hook path has world-writable parent without sticky bit" " {}: {}".format(parent, path) ) async def execute_hook(path, data, native=False): try: # Path validation runs filesystem syscalls (isfile/access/realpath) # which can block the event loop on slow/NFS storage; defer to a # threadpool executor. The same checks apply to native hooks # because native_hooks.execute_hook imports the file via importlib # straight in the agent's root process — a DB-sourced /tmp path # there is at least as dangerous as a subprocess fork. loop = asyncio.get_event_loop() await loop.run_in_executor(None, _validate_hook_path, path, native) if native: native_hooks.execute_hook(path, data) exit_code, err = 0, None else: data = json.dumps(data).encode() cwd = os.path.dirname(path) exit_code, _, err = await run( [path], shell=False, input=data, cwd=cwd ) except Exception as e: exit_code, err = None, repr(e) return exit_code, err async def execute_hooks(event, tempdir=Core.TMPDIR): dump = event.get("DUMP") params = dict(event) hooks = get_hooks(event.event) if not hooks: return with event_hook_logger(event.event, event.subtype) as event_logger: if dump: prefix = snake_case(event.__class__.__name__) + "_" tmp = tempfile.NamedTemporaryFile( mode="w+", prefix=prefix, suffix=".json", dir=tempdir ) json.dump(dump, tmp) tmp.flush() os.fsync(tmp.fileno()) params["tmp_filename"] = tmp.name data = { "event": event.event, "subtype": event.subtype, "params": params, } for hook in hooks: with event_logger(hook.path, native=hook.native) as hook_logger: hook_logger.begin() exit_code, err = await execute_hook( hook.path, data, native=hook.native ) hook_logger.finish(exit_code, err)
Upload File
Create Folder