X7ROOT File Manager
Current Path:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/wordpress
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
wordpress
/
??
..
??
__init__.py
(1.23 KB)
??
__pycache__
??
changelog_processor.py
(12.14 KB)
??
cli.py
(10.51 KB)
??
constants.py
(346 B)
??
exception.py
(94 B)
??
incident_collector.py
(16.23 KB)
??
incident_parser.py
(4.03 KB)
??
incident_sender.py
(5.35 KB)
??
plugin.py
(66.39 KB)
??
proxy_auth.py
(5.07 KB)
??
site_repository.py
(17.26 KB)
??
telemetry.py
(541 B)
??
utils.py
(20.89 KB)
??
wp_rules.py
(3.44 KB)
Editing: utils.py
import errno import json import logging import os import pwd import shlex import subprocess from collections import defaultdict from datetime import datetime, timedelta from functools import lru_cache from pathlib import Path from typing import Optional from defence360agent.contracts.config import ( choose_value_from_config, MalwareScanScheduleInterval as Interval, ) from defence360agent.contracts.license import LicenseCLN from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.subsys.panels.plesk import Plesk from defence360agent.utils import ( IMUNIFY_PACKAGE_NAMES, async_lru_cache, atomic_rewrite, check_run, system_packages_info, ) from defence360agent.utils.fd_ops import open_dir_no_symlinks, safe_dir from defence360agent.model.wordpress import WPSite from defence360agent.wordpress.constants import WP_CLI_WRAPPER_PATH from defence360agent.wordpress.exception import PHPError CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user" CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl" _VALID_PRESETS = frozenset(("balanced", "strict", "monitor")) logger = logging.getLogger(__name__) def _validate_preset(value: object) -> str: """Coerce a config-read preset value to a canonical preset string. Returns "balanced" for anything outside _VALID_PRESETS — including None, non-strings, and hand-edited values like "extreme" or "BALANCED". The agent always writes lowercase canonical values, so a non-canonical read indicates either a manual edit or a future preset that this version doesn't recognise; "balanced" is the safe fallback in both cases. """ if isinstance(value, str) and value in _VALID_PRESETS: return value return "balanced" @async_lru_cache(ttl=60) async def get_domain_paths() -> dict[str, list[str]]: """ Get a mapping of docroots to their associated domains, with caching. """ hosting_panel = HostingPanel() panel_paths = await hosting_panel.get_domain_paths() docroot_map = defaultdict(list) for domain, docroots in panel_paths.items(): for docroot in docroots: docroot_map[docroot].append(domain) return docroot_map def wp_wrapper(php_path: str, docroot: str) -> list: """Get wp cli common command list""" return [str(WP_CLI_WRAPPER_PATH), php_path, docroot] @lru_cache(maxsize=1) def get_cagefs_enabled_users() -> set: """Get the list of users enabled for CageFS.""" if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access( CAGEFS_CTL_PATH, os.X_OK ): return set() result = subprocess.run( [CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True ) if result.returncode != 0: return set() lines = result.stdout.strip().split("\n") return set(lines[1:]) # Skip the first line which is a summary def clear_get_cagefs_enabled_users_cache(): """Clear the cache for get_cagefs_enabled_users.""" get_cagefs_enabled_users.cache_clear() def build_command_for_user(username: str, args: list) -> list: """Build the necessary command to run the given cmdline args with specified user.""" if username in get_cagefs_enabled_users(): if os.path.isfile(CAGEFS_ENTER_PATH) and os.access( CAGEFS_ENTER_PATH, os.X_OK ): return [ CAGEFS_ENTER_PATH, "--no-io-and-memory-limit", username, *args, ] return [ "su", "-s", "/bin/bash", username, "-c", shlex.join(args), ] async def get_domains_for_docroot( docroot: str, domain_to_exclude: str ) -> list[str]: """ Get all domains associated with a given document root, excluding one domain. It's panel-agnostic and uses a cached mapping. """ docroot_map = await get_domain_paths() all_domains = docroot_map.get(docroot, []) return [domain for domain in all_domains if domain != domain_to_exclude] async def get_php_binary_path(site: WPSite, username: str) -> Optional[str]: """Determine PHP binary path for the given WPSite.""" from clcommon.cpapi import ( get_domains_php_info, get_installed_php_versions, ) domains_php_info = get_domains_php_info() installed_php_versions = get_installed_php_versions() def find_php_binary_for_domain(domain: str) -> Optional[str]: domain_info = domains_php_info.get(domain) if not domain_info or domain_info.get("username") != username: return None php_display_version = domain_info.get("display_version") if not php_display_version: return None for php_version in installed_php_versions: if php_version.get("identifier") == php_display_version: return php_version.get("bin") return None # First, try with the main domain of the site. php_binary_path = find_php_binary_for_domain(site.domain) if php_binary_path: return php_binary_path # If not found, try with other domains for the site's docroot. domains = await get_domains_for_docroot( site.docroot, domain_to_exclude=site.domain ) for domain in domains: php_binary_path = find_php_binary_for_domain(domain) if php_binary_path: return php_binary_path raise PHPError( f"PHP binary was not identified for docroot: {site.docroot}, username:" f" {username}" ) def get_malware_history(username: str) -> list: """ Get malware history for the specified user. This is an equivalent of calling `imunify360-agent malware history list --user {username}`. Returns empty list if imav malware module is not available. """ try: from imav.malwarelib.model import MalwareHit (max_count, hits) = MalwareHit.malicious_list(user=username) return hits except ImportError: logger.debug( "imav.malwarelib not available, returning empty malware history" ) return [] async def get_last_scan(sink, username: str) -> dict: """ Get the last scan for the specified user. This is an equivalent of calling `imunify360-agent malware user list --user {username}`. Returns empty dict if imav malware module is not available. """ try: from imav.malwarelib.scan.queue_supervisor_sync import ( QueueSupervisorSync as ScanQueue, ) from imav.malwarelib.utils import user_list except ImportError: logger.debug( "imav.malwarelib not available, returning empty last scan" ) return {} queue = ScanQueue(sink) _, users = await user_list.fetch_user_list( queue.get_scans_from_paths, match={username} ) if not users: return {} users = user_list.sort(users, "scan_date", desc=True) return users[0] def calculate_next_scan_timestamp(interval, hour, day_of_month, day_of_week): """ Calculate the next scan timestamp based on schedule configuration. Args: interval: Scan interval (DAY, WEEK, MONTH, or NONE) hour: Hour of day to run scan (0-23) day_of_month: Day of month to run scan (1-31) day_of_week: Day of week to run scan (0-6, where 0=Sunday) Returns: Timestamp of next scan, or None if interval is NONE """ today = datetime.utcnow() if interval == Interval.DAY: next_scan = today.replace( hour=hour, minute=0, second=0, microsecond=0, ) if today >= next_scan: next_scan += timedelta(days=1) return next_scan.timestamp() if interval == Interval.WEEK: # today.weekday() returns 0 for Monday, 6 for Sunday, but day_of_week uses 0 for Sunday, # 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation. days_ahead = (day_of_week - (today.weekday() + 1) % 7 + 7) % 7 if days_ahead == 0 and today.hour >= hour: days_ahead = 7 next_scan_date = today + timedelta(days=days_ahead) return next_scan_date.replace( hour=hour, minute=0, second=0, microsecond=0 ).timestamp() if interval == Interval.MONTH: from calendar import monthrange def find_next_suitable_month(year, month, days): """Find the next month that has at least given number of days.""" current_year, current_month = year, month # Always start with the next month when advancing current_month += 1 if current_month > 12: current_month = 1 current_year += 1 # Keep advancing months until we find one with enough days while True: days_in_month = monthrange(current_year, current_month)[1] if days <= days_in_month: return current_year, current_month current_month += 1 if current_month > 12: current_month = 1 current_year += 1 # Check if we need to advance to next month should_advance_month = ( # Today is after the scheduled day, scan already ran this month today.day > day_of_month # Today is the scheduled day and the hour is after the scheduled hour, scan already ran earlier today or (today.day == day_of_month and today.hour >= hour) # Current month doesn't have enough days, scan should run next suitable month or day_of_month > monthrange(today.year, today.month)[1] ) if should_advance_month: # Find the next month that can accommodate the configured day next_year, next_month = find_next_suitable_month( today.year, today.month, day_of_month ) next_scan_date = today.replace( day=day_of_month, # Use the actual configured day month=next_month, year=next_year, hour=hour, minute=0, second=0, microsecond=0, ) else: # Current month can accommodate the configured day next_scan_date = today.replace( day=day_of_month, hour=hour, minute=0, second=0, microsecond=0, ) return next_scan_date.timestamp() async def get_imunify_package_versions() -> dict[str, str | None]: """Fetch installed versions of Imunify packages. Returns a dict mapping package name to version string, with None for packages that are not installed. Intended to be called once per sync cycle (not per site). """ return await system_packages_info(IMUNIFY_PACKAGE_NAMES) def prepare_scan_data( last_scan_time: float, next_scan_time: float, username: str, site: WPSite, malware_by_site: dict, versions: dict[str, str | None] | None = None, ) -> dict: """ Prepare scan data JSON for a WordPress site. Args: last_scan_time: Timestamp of the last scan next_scan_time: Timestamp of the next scheduled scan username: Username of the site owner site: WordPress site object malware_by_site: Dictionary mapping site docroots to their malware hits versions: Optional dict mapping Imunify package names to version strings (None for uninstalled packages). When provided, included in the output as a ``versions`` key. Returns: dict: JSON data ready to be written to scan_data.php. The response includes: - lastScanTimestamp: Timestamp of the last scan - nextScanTimestamp: Timestamp of the next scheduled scan - username: Username of the site owner - malware: List of malware hits for the site - config: Configuration items for the site - license: License information including status and eligibility for Imunify patch - versions: (optional) Installed Imunify package versions """ # Define the config sections and options needed config_sections = [ ("MALWARE_SCANNING", "enable_scan_cpanel"), ("MALWARE_SCANNING", "default_action"), ("PROACTIVE_DEFENCE", "blamer"), ] # Build the config items config_items = {} for section, option in config_sections: if section not in config_items: config_items[section] = {} try: value, _ = choose_value_from_config( section, option, username=username, ) except KeyError: value = None config_items[section][option] = value result = { "lastScanTimestamp": last_scan_time, "nextScanTimestamp": next_scan_time, "username": username, "malware": malware_by_site.get(site.docroot, []), "config": config_items, "license": LicenseCLN.license_info(), } if versions is not None: result["versions"] = versions return result def prepare_plugin_config(username: str) -> dict: """ Prepare the plugin_config.php payload. Dedicated channel for WP-plugin-facing configuration that the mu-plugin reads on the request hot path. Kept separate from scan_data.php so that a config toggle doesn't force rewriting the (potentially large) malware list, and so the mu-plugin loads only the data it actually needs per request. Forward compatibility: the plugin ships with the agent, so their versions are in lockstep. Any forward-compat gating lives here on the writer side — the agent simply omits a field it doesn't know about, and the plugin treats missing fields as "unset, use safe default". No per-field version stamp is needed in the file itself. Args: username: Owner of the WP site. Returns: Dict ready to be encoded as PHP via format_php_with_embedded_json: - ai_bot_protection: bool — admin WORDPRESS.ai_bot_protection - preset: str — admin WORDPRESS.ai_bot_protection_preset, normalised via _validate_preset to one of "balanced"/"strict"/"monitor". Falls back to "balanced" when the schema lacks the key (older agent) or the configured value is non-canonical (manual edit, future preset). """ try: ai_bot_protection, _ = choose_value_from_config( "WORDPRESS", "ai_bot_protection", username=username, ) except KeyError: ai_bot_protection = False try: preset, _ = choose_value_from_config( "WORDPRESS", "ai_bot_protection_preset", username=username, ) except KeyError: preset = "balanced" return { "ai_bot_protection": bool(ai_bot_protection), "preset": _validate_preset(preset), } def write_plugin_data_file_atomically( file_path, content: str, uid: int, gid: int, *, dir_fd: int | None = None ) -> None: """Write a plugin data file atomically. When *dir_fd* is supplied by the caller (e.g. from ensure_site_data_directory) it is used directly; otherwise the parent directory is opened with symlink protection. """ permissions = 0o440 if HostingPanel().NAME == Plesk.NAME else 0o400 if dir_fd is not None: atomic_rewrite( file_path, content, backup=False, uid=uid, gid=gid, permissions=permissions, dir_fd=dir_fd, ) return with safe_dir(file_path.parent) as owned_dir_fd: atomic_rewrite( file_path, content, backup=False, uid=uid, gid=gid, permissions=permissions, dir_fd=owned_dir_fd, ) def _escape_json_for_php_single_quoted_string(json_str: str) -> str: """ Escape a JSON string for embedding inside a PHP single-quoted string. PHP single-quoted strings only recognise two escape sequences: ``\\\\`` (literal backslash) and ``\\'`` (literal single quote). All other backslash sequences are kept verbatim. That means we must double every ``\\`` *before* we escape ``'``, otherwise PHP will consume JSON backslashes (e.g. ``\\\\s`` in JSON becomes ``\\s`` after PHP parsing, which is not a valid JSON escape). """ return json_str.replace("\\", "\\\\").replace("'", "\\'") def _unescape_php_single_quoted_json(escaped: str) -> str: """ Reverse the escaping applied by :func:`_escape_json_for_php_single_quoted_string`. """ return escaped.replace("\\'", "'").replace("\\\\", "\\") def format_php_with_embedded_json(data: dict) -> str: """ Format a dictionary as a PHP file that returns JSON-decoded data. This creates a WordPress-safe PHP file that: 1. Checks if it's being included from WordPress (WPINC defined) 2. Returns the data as a decoded JSON string Args: data: Dictionary to embed in the PHP file Returns: Formatted PHP file content as a string """ return ( "<?php\n" "if ( ! defined( 'WPINC' ) ) {\n" "\texit;\n" "}\n" "return json_decode( '" + _escape_json_for_php_single_quoted_string(json.dumps(data)) + "', true );" ) def parse_php_with_embedded_json(content: str) -> dict: """ Parse a PHP file generated by format_php_with_embedded_json. Extracts and returns the embedded JSON data. Args: content: PHP file content string Returns: Parsed JSON data as a dict Raises: ValueError: If the JSON data cannot be found or parsed """ marker = "json_decode( '" start = content.find(marker) if start == -1: raise ValueError("No embedded JSON found in PHP content") start += len(marker) end = content.find("', true )", start) if end == -1: raise ValueError("Malformed embedded JSON in PHP content") json_str = _unescape_php_single_quoted_json(content[start:end]) return json.loads(json_str) def ensure_directory_listing_protection( data_dir: Path, uid: int, gid: int, *, dir_fd: int ) -> None: """ Ensure directory listing protection files exist in the data directory. Creates .htaccess, index.php, and index.html files to prevent directory listing. All writes use the caller-supplied *dir_fd* so that no path-based symlink check is required. atomic_rewrite skips the write when the file already contains the expected content, preserving idempotency. """ protection_files = { ".htaccess": "DirectoryIndex index.php index.html\ndeny from all\n", "index.php": "<?php\n// This file is intentionally blank.\n", "index.html": "<!-- This file is intentionally blank. -->\n", } for filename, content in protection_files.items(): file_path = data_dir / filename write_plugin_data_file_atomically( file_path, content, uid=uid, gid=gid, dir_fd=dir_fd ) async def ensure_site_data_directory( site: WPSite, user_info: pwd.struct_passwd ) -> Path: """Ensure the site's data directory exists with correct permissions. The directory is opened with symlink protection after creation (or if it already exists) to obtain a stable file descriptor. All subsequent operations use that descriptor. Args: site: WordPress site user_info: User information from pwd Returns: Path to data directory Raises: Exception: If the data directory is a symlink or cannot be created """ from defence360agent.wordpress import cli data_dir = await cli.get_data_dir(site) newly_created = False try: dir_fd = open_dir_no_symlinks(data_dir) except FileNotFoundError: # Directory does not exist yet — create it as the site user so that # it is owned by the user (not root), then re-open with O_NOFOLLOW. command = build_command_for_user( user_info.pw_name, ["mkdir", "-p", str(data_dir)], ) await check_run(command) try: dir_fd = open_dir_no_symlinks(data_dir) except OSError as exc: if exc.errno in (errno.ELOOP, errno.ENOTDIR): raise Exception( f"Data directory {data_dir} is a symlink, skipping." ) from exc raise Exception( f"Failed to open data directory {data_dir}: {exc}" ) from exc newly_created = True except OSError as exc: if exc.errno in (errno.ELOOP, errno.ENOTDIR): raise Exception( f"Data directory {data_dir} is a symlink, skipping." ) from exc raise try: if newly_created: os.chmod(dir_fd, 0o750) ensure_directory_listing_protection( data_dir, uid=site.uid, gid=user_info.pw_gid, dir_fd=dir_fd ) finally: os.close(dir_fd) return data_dir
Upload File
Create Folder