X7ROOT File Manager
Current Path:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
utils
/
??
..
??
__init__.py
(61.08 KB)
??
__pycache__
??
_shutil.py
(795 B)
??
antivirus_mode.py
(497 B)
??
async_utils.py
(718 B)
??
benchmark.py
(538 B)
??
buffer.py
(1.24 KB)
??
check_db.py
(7.72 KB)
??
check_lock.py
(636 B)
??
cli.py
(7.39 KB)
??
common.py
(14.41 KB)
??
completions.py
(9.75 KB)
??
config.py
(999 B)
??
cronjob.py
(902 B)
??
doctor.py
(5.37 KB)
??
fd_ops.py
(7.4 KB)
??
hyperscan.py
(149 B)
??
importer.py
(2.67 KB)
??
ipecho.py
(3.17 KB)
??
json.py
(953 B)
??
kwconfig.py
(1.56 KB)
??
net_transport.py
(11.93 KB)
??
parsers.py
(11.6 KB)
??
resource_limits.py
(2.29 KB)
??
safe_fileops.py
(9.83 KB)
??
safe_sequence.py
(363 B)
??
serialization.py
(1.72 KB)
??
sshutil.py
(7.94 KB)
??
subprocess.py
(1.53 KB)
??
support.py
(5.2 KB)
??
threads.py
(1005 B)
??
validate.py
(4.27 KB)
??
whmcs.py
(7.6 KB)
??
wordpress_mu_plugin.py
(1.41 KB)
Editing: fd_ops.py
"""fd-based file operations for symlink-attack mitigation. All helpers in this module use O_NOFOLLOW and dir_fd-relative syscalls so that no path-based resolution can be redirected by a concurrent symlink swap. This module is intentionally kept separate from utils/__init__.py to avoid loading these OS-specific helpers into every agent component. """ import errno import logging import os import stat from contextlib import contextmanager, suppress from pathlib import Path logger = logging.getLogger(__name__) def rmtree_fd(dir_fd) -> None: """Remove all contents of a directory using fd-relative operations. Every entry is opened with ``O_NOFOLLOW`` so symlinks inside the tree are unlinked rather than followed. The directory referenced by *dir_fd* itself is **not** removed — the caller should ``os.rmdir()`` the parent entry after this call returns. Uses an iterative approach with an explicit stack to avoid hitting Python's recursion limit on adversarial deeply-nested trees. *dir_fd* must be an open ``O_RDONLY | O_DIRECTORY`` descriptor. """ # Each stack frame is (fd, name_to_rmdir_after_close) where # name_to_rmdir_after_close is the entry name that should be # rmdir'd from the parent once this fd is fully processed. # The initial fd is managed by the caller, so its rmdir entry is None. stack = [(dir_fd, None)] try: while stack: current_fd, _ = stack[-1] pushed = False with os.scandir(current_fd) as entries: for entry in entries: if entry.is_dir(follow_symlinks=False): child_fd = os.open( entry.name, os.O_RDONLY | os.O_DIRECTORY | os.O_NOFOLLOW, dir_fd=current_fd, ) stack.append((child_fd, entry.name)) pushed = True break # restart scan from the new directory else: os.unlink(entry.name, dir_fd=current_fd) if not pushed: # All entries in current directory have been removed. fd, name = stack.pop() if name is not None: # Close the child fd and rmdir it from the parent. os.close(fd) parent_fd, _ = stack[-1] os.rmdir(name, dir_fd=parent_fd) except BaseException: # On error, close any fds we opened (but not the caller's dir_fd). for fd, name in stack: if name is not None: os.close(fd) raise def open_dir_no_symlinks(path) -> int: """Open a directory, refusing symlinks at every path component. Walks the absolute *path* one component at a time, opening each with ``O_NOFOLLOW | O_DIRECTORY`` relative to the parent fd. This guards against symlink attacks at *any* depth in the hierarchy, not just the leaf. Returns an ``O_RDONLY`` file descriptor for the final directory. The caller is responsible for closing it. """ path = os.path.abspath(os.fspath(path)) parts = Path(path).parts # ('/', 'home', 'user', ...) fd = os.open(parts[0], os.O_RDONLY | os.O_DIRECTORY) try: for part in parts[1:]: new_fd = os.open( part, os.O_RDONLY | os.O_DIRECTORY | os.O_NOFOLLOW, dir_fd=fd, ) os.close(fd) fd = new_fd return fd except BaseException: os.close(fd) raise @contextmanager def open_nofollow(path, flags=os.O_RDONLY, *, dir_fd=None): """Open a file with O_NOFOLLOW, closing the fd on exit. Yields the raw file descriptor. Rejects symlinks at the leaf component (raises ELOOP). When *dir_fd* is provided, *path* is resolved relative to that directory descriptor. """ kw = {"dir_fd": dir_fd} if dir_fd is not None else {} fd = os.open(str(path), flags | os.O_NOFOLLOW, **kw) try: yield fd finally: os.close(fd) @contextmanager def safe_dir(path): """Open a directory with symlink protection, closing the fd on exit. Walks every path component with O_NOFOLLOW via open_dir_no_symlinks and yields the resulting fd. """ fd = open_dir_no_symlinks(path) try: yield fd finally: os.close(fd) def atomic_rewrite_fd( filename, data: bytes, *, uid, gid, allow_empty_content, permissions, dir_fd: int, ) -> bool: """dir_fd-relative implementation of atomic_rewrite. The caller opens the directory with O_NOFOLLOW before any file I/O begins. All file operations use dir_fd so that a concurrent rename of the directory to a symlink cannot redirect writes to a privileged path. """ _, basename = os.path.split(filename) # Read current content without following symlinks. try: content_fd = os.open( basename, os.O_RDONLY | os.O_NOFOLLOW, dir_fd=dir_fd ) with os.fdopen(content_fd, "rb") as f: old_content = f.read(len(data) + 1) if old_content == data: return False except FileNotFoundError: pass # file does not exist yet; will be created except OSError as exc: if exc.errno == errno.ELOOP: pass # existing entry is a symlink; overwrite it else: raise if not allow_empty_content and not data: logger.error("empty content: %r for file: %s", data, filename) return False if permissions is None: try: st = os.stat(basename, dir_fd=dir_fd, follow_symlinks=False) if stat.S_ISLNK(st.st_mode): raise OSError(errno.ELOOP, os.strerror(errno.ELOOP), basename) permissions = stat.S_IMODE(st.st_mode) except FileNotFoundError: current_umask = os.umask(0) os.umask(current_umask) permissions = 0o666 & ~current_umask # Create temp file atomically inside the directory referenced by dir_fd. # O_NOFOLLOW + O_EXCL ensures the name cannot be a pre-existing symlink. tmp_basename = None tmp_fd = -1 for _ in range(100): tmp_basename = f"{basename}_{os.urandom(4).hex()}.i360edit" try: tmp_fd = os.open( tmp_basename, os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_NOFOLLOW, 0o600, dir_fd=dir_fd, ) break except FileExistsError: continue else: raise FileExistsError("Could not create temporary file (100 attempts)") try: view = memoryview(data) written = 0 while written < len(data): written += os.write(tmp_fd, view[written:]) if uid is not None and gid is not None: os.chown(tmp_fd, uid, gid) os.chmod(tmp_fd, permissions) os.fsync(tmp_fd) os.close(tmp_fd) tmp_fd = -1 # Atomic rename entirely within the directory we hold open. os.rename(tmp_basename, basename, src_dir_fd=dir_fd, dst_dir_fd=dir_fd) tmp_basename = None # rename succeeded; no cleanup needed finally: if tmp_fd >= 0: os.close(tmp_fd) if tmp_basename is not None: with suppress(FileNotFoundError): os.unlink(tmp_basename, dir_fd=dir_fd) return True
Upload File
Create Folder