X7ROOT File Manager
Current Path:
/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/lib/commons
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
lvestats
/
lib
/
commons
/
??
..
??
__init__.py
(219 B)
??
__pycache__
??
argparse_utils.py
(11.25 KB)
??
dateutil.py
(5.63 KB)
??
decorators.py
(893 B)
??
fileutil.py
(1.14 KB)
??
func.py
(15.89 KB)
??
htpasswd.py
(2.18 KB)
??
litespeed.py
(9.77 KB)
??
logsetup.py
(4.5 KB)
??
proctitle.py
(4.7 KB)
??
profiler.py
(575 B)
??
progress.py
(1016 B)
??
sentry.py
(6.17 KB)
??
server_status.py
(1.31 KB)
??
sizeutil.py
(2.59 KB)
??
users_manager.py
(2.97 KB)
Editing: litespeed.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import base64 import logging import urllib.request import urllib.error import urllib.parse import ssl import os from lxml import etree from lvestats.lib.commons.fileutil import open_nofollow from lvestats.lib.commons.func import get_all_user_domains, normalize_domain log = logging.getLogger('litespeed') class LiteSpeedException(Exception): pass class LiteSpeedDisabledException(LiteSpeedException): pass class LiteSpeedInvalidCredentials(LiteSpeedException): pass class LiteSpeedDataMapping(object): TIME = 3 HOST = 8 REQUEST = 14 TOTAL_LEN = 15 class LiteSpeed(object): IGNORE_HOSTS = [b'_AdminVHost'] PID_FILE_PATH = '/tmp/lshttpd/lshttpd.pid' HTPASSWD_PATH = '/usr/local/lsws/admin/htpasswds/status' HTTP_TIMEOUT = 2 LS_ADMIN_CONFIG = "/usr/local/lsws/admin/conf/admin_config.xml" def __init__(self, login, password): self.login = login self.password = password @staticmethod def _open_nofollow(path): """Thin wrapper around :func:`fileutil.open_nofollow` kept for backward-compatible mocking in tests.""" return open_nofollow(path) @staticmethod def _read_pid_file(): """ Read and validate PID from litespeed's pidfile. :return: str pid value :raises: OSError, ValueError on any validation failure """ fd = LiteSpeed._open_nofollow(LiteSpeed.PID_FILE_PATH) try: if os.fstat(fd).st_uid != 0: raise ValueError("pid file not owned by root") with os.fdopen(fd, encoding='utf-8') as f: fd = -1 pid_str = f.readline().rstrip(os.linesep) finally: if fd != -1: os.close(fd) pid = int(pid_str) if pid <= 0: raise ValueError("invalid pid value") if os.stat(f'/proc/{pid}/status').st_uid != 0: raise ValueError("process not owned by root") return pid_str @staticmethod def _get_litespeed_pid(): """ Returns pid that is stored in litespeed's pidfile :return: str """ if not os.path.isfile(LiteSpeed.PID_FILE_PATH) or not os.path.isfile(LiteSpeed.HTPASSWD_PATH): return None try: return LiteSpeed._read_pid_file() except (OSError, ValueError): return None @staticmethod def is_litespeed_running(): """ Checks whether pid is not None. :return: bool """ return LiteSpeed._get_litespeed_pid() is not None def _get_litespeed_webadmin_port(self): """ Retrives current LiteSpeed webadmin console port :return: LiteSpeed webadmin console port as string """ try: # Part of Litespeed config, containing console port: # <?xml version="1.0" encoding="UTF-8"?> # <adminConfig> # <listenerList> # <listener> # <name>adminListener</name> # <address>*:7080</address> # <secure>0</secure> # </listener> # </listenerList> with open(self.LS_ADMIN_CONFIG, 'r', encoding='utf-8') as f: ls_adm_cfg = etree.parse(f).getroot() data = ls_adm_cfg.xpath("listenerList/listener/address")[0] return data.text.split(':')[1] except (AttributeError, IndexError, ValueError, OSError, IOError) as e: raise LiteSpeedException( "Can't determine current LiteSpeed webadmin console " f"port from config {self.LS_ADMIN_CONFIG}: {e}" ) from e def _get_requests(self): """ Get info about connections from litespeed and returns array of rows with data :return: list :raise: [LiteSpeedInvalidCredentials, LiteSpeedDisabledException] """ port = self._get_litespeed_webadmin_port() # Try HTTP first for compatibility with older LiteSpeed versions # that may serve the admin console on plain HTTP. If HTTP fails, # retry with HTTPS. LiteSpeed 6.x forces HTTPS even when # <secure>0</secure> is set in admin_config.xml — HTTP requests # get 301-redirected to HTTPS, and urllib strips the Authorization # header on the cross-scheme redirect, resulting in a 401. So we # treat both connection errors AND auth errors on HTTP as reasons # to retry with HTTPS. data = self._fetch_status(f'http://localhost:{port}/status?rpt=details', raise_on_auth_error=False) if data is None: data = self._fetch_status(f'https://localhost:{port}/status?rpt=details', raise_on_auth_error=True) if data is None: raise LiteSpeedDisabledException( "Cannot connect to LiteSpeed WebAdmin on either HTTP or HTTPS" ) # remove empty lines result = [row for row in data.split(os.linesep.encode()) if row.strip() != b''] return result def _fetch_status(self, status_url, raise_on_auth_error=True): """ Fetch LiteSpeed status page at the given URL. :param raise_on_auth_error: if True, raise LiteSpeedInvalidCredentials on 401/403; if False, return None (allows caller to retry with a different URL scheme) :return: response bytes on success, None on failure :raise: LiteSpeedInvalidCredentials on 401/403 (when raise_on_auth_error=True) :raise: LiteSpeedDisabledException on non-auth HTTP errors """ request = urllib.request.Request(status_url) base64string = base64.b64encode(b'%s:%s' % (self.login.encode(), self.password.encode())) request.add_header(b"Authorization", b"Basic %s" % base64string) try: context = ssl._create_unverified_context() # pylint: disable=protected-access with urllib.request.urlopen( request, timeout=self.HTTP_TIMEOUT, context=context, ) as response: return response.read() except urllib.error.HTTPError as e: if e.code in [401, 403]: if raise_on_auth_error: raise LiteSpeedInvalidCredentials( "Litespeed login / password invalid. " "Please, try restart lvestats service." ) from e log.debug('Auth error on %s (HTTP %s), will retry', status_url, e.code) return None # Non-auth HTTP errors (4xx, 5xx) are server-side responses — # the server received and processed the request. Retrying with # a different URL scheme won't help, so raise immediately. raise LiteSpeedDisabledException(str(e)) from e except (urllib.error.URLError, OSError) as e: # Connection refused, SSL handshake failure, timeout, socket error. # Return None so the caller can retry with a different URL scheme. log.debug('Connection error on %s: %s, will retry', status_url, e) return None def __is_host_valid(self, host): """ Check whether host is not empty. :type host: str :return: bool """ host = host.strip() if host and host not in self.IGNORE_HOSTS: return True return False def _parse_request_info(self, request: bytes): """ :return: method, url, http_version """ request_info = request.strip(b'"').split() if len(request_info) == 3: method, url, http_version = request_info elif len(request_info) == 2: method, url = request_info http_version = b'' else: return None return method, url, http_version def get_user_data(self, username): """ Returns information about processed by user pages. :param username: :return list[list]: list of the lists [[Pid, Domain, Http type, Path, Http version, Time],...] :raises: LiteSpeedDownException """ data_delimiter = b'\t' pid = self._get_litespeed_pid() all_domains = get_all_user_domains(username) normalized_domains = set(map(normalize_domain, all_domains)) requests = self._get_requests() litespeed_requests = [] for request in requests: request_info = request.split(data_delimiter) if len(request_info) < LiteSpeedDataMapping.TOTAL_LEN: # that is not valid request info, skip it... continue host = request_info[LiteSpeedDataMapping.HOST] request = request_info[LiteSpeedDataMapping.REQUEST] # time since first request, seconds request_time = self.to_float(request_info[LiteSpeedDataMapping.TIME]) if self.__is_host_valid(host) and \ normalize_domain(host.decode()) in normalized_domains: request_data = self._parse_request_info(request) if request_data is not None: method, url, http_version = request_data litespeed_requests.append((pid, host, method, url, http_version, request_time)) return litespeed_requests @staticmethod def to_float(string): """ Converts str to float, if can't return -1. :type string: str :rtype: float """ try: return float(string) except ValueError: return -1.
Upload File
Create Folder