X7ROOT File Manager
Current Path:
/opt/imunify360/venv/lib/python3.11/site-packages/imav/plugins
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
imav
/
plugins
/
??
..
??
__init__.py
(0 B)
??
__pycache__
??
aibolit_resident_ff_watcher.py
(10.06 KB)
??
check_license.py
(9.27 KB)
??
conflicts.py
(3.18 KB)
??
detect_admin_tools_watcher.py
(2.64 KB)
??
event_hook_executor.py
(3.72 KB)
??
event_hooks.py
(3.47 KB)
??
generic_sensor.py
(5.82 KB)
??
im360_register.py
(2.82 KB)
??
imunify_patch_id.py
(2.33 KB)
??
inotify.py
(1.75 KB)
??
malware_filters.py
(3.59 KB)
??
mr_proper.py
(2.67 KB)
??
plesk_notifications.py
(4.59 KB)
??
post_action.py
(1.72 KB)
??
restore_from_backup.py
(3.37 KB)
??
send_malware_infection_state.py
(11.51 KB)
??
server_pull.py
(2.56 KB)
??
service_manager.py
(3.93 KB)
Editing: generic_sensor.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> Generic Sensor plugin - Creates listening unix domain socket on config.GenericSensor.SOCKET_PATH - Expects alert data formatted as """ import base64 import json import os import struct import time from logging import getLogger # Side-effect import: defining the MalwareScan / MalwareScanTask / ... # subclasses registers them with MessageType via __init_subclass__, so # MessageType.MalwareScan resolves to the real class instead of # UnknownMessage. Without this, METHOD2MSGTYPE entries collapse to # UnknownMessage at first use. import imav.contracts.messages # noqa: F401 from defence360agent.contracts.config import ( SimpleRpc, GENERIC_SENSOR_SOCKET_PATH, ) from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import Sensor from defence360agent.internals.auth_protocol import UnixSocketAuthProtocol from defence360agent.internals.global_scope import g from defence360agent.internals.logger import getNetworkLogger from defence360agent.simple_rpc import RpcServerAV from defence360agent.utils import Scope from defence360agent.utils.buffer import LineBuffer logger, network_logger = getLogger(__name__), getNetworkLogger(__name__) class Protocol(UnixSocketAuthProtocol): METHOD2MSGTYPE = { "NOOP": MessageType.Noop, "MALWARE_SCAN": MessageType.MalwareScan, "MALWARE_SCAN_TASK": MessageType.MalwareScanTask, "MALWARE_SCAN_COMPLETE": MessageType.MalwareScanComplete, "MALWARE_CLEAN_COMPLETE": MessageType.MalwareCleanComplete, "MALWARE_RESTORE_COMPLETE": MessageType.MalwareRestoreComplete, "MALWARE_CHECK_DETACHED_SCANS": MessageType.CheckDetachedScans, "MALWARE_SEND_FILES": MessageType.MalwareSendFiles, } def __init__(self, loop, sink, *_): self._loop = loop self._sink = sink self._line_buffer = LineBuffer() self.transport = None def connection_made(self, transport): try: super().connection_made(transport) except (OSError, AttributeError, struct.error) as exc: network_logger.warning( "Rejected generic_sensor connection: " "SO_PEERCRED unavailable (%s)", exc, ) transport.close() return if self._uid != 0: network_logger.warning( "Rejected generic_sensor connection from non-root peer " "uid=%d gid=%d pid=%d", self._uid, self._gid, self._pid, ) transport.close() return self.transport = transport network_logger.debug("Connection made") def data_received(self, data): if self.transport is None: return msgs = data.decode() if not msgs.strip(): logger.error("Empty message received <%s>", msgs) return self._line_buffer.append(msgs) for msg in self._line_buffer: if msg: network_logger.debug("data_received: {!r}".format(msg)) tokens = self._parse_msg(msg) if tokens: tokens["timestamp"] = time.time() self._process_msg(tokens) def _parse_msg(self, msg): try: return json.loads(msg) except json.JSONDecodeError: logger.exception("data_received(%s): unable to decode", repr(msg)) def _process_msg(self, tokens): # map 'method' to appropriate Message type try: method = tokens["method"] msgtype = self.METHOD2MSGTYPE[method] except KeyError as e: logger.error( "data_received(%s): Wrong or missing 'method' [%s]", repr(tokens), repr(e), ) return if method == "MALWARE_SCAN_TASK": tokens["filelist"] = [ os.fsdecode(base64.b64decode(f)) for f in tokens["filelist"] ] self._loop.create_task(self._sink.process_message(msgtype(tokens))) def connection_lost(self, transport): self.transport = None network_logger.debug("Disconnected") class GenericSensor(Sensor): SOCKET_PATH = GENERIC_SENSOR_SOCKET_PATH PROTOCOL_CLASS = Protocol SCOPE = Scope.AV async def create_sensor(self, loop, sink): if SimpleRpc.SOCKET_ACTIVATION: class GenericSensorSocket(RpcServerAV): SOCKET_PATH = self.SOCKET_PATH PROTOCOL_CLASS = self.PROTOCOL_CLASS g.sensor_server = await GenericSensorSocket.create(loop, sink) return g.sensor_server else: # FIXME make sure root can write to os.makedirs(os.path.dirname(self.SOCKET_PATH), exist_ok=True) if os.path.exists(self.SOCKET_PATH): os.unlink(self.SOCKET_PATH) g.sensor_server = await loop.create_unix_server( lambda: self.PROTOCOL_CLASS(loop, sink), self.SOCKET_PATH ) os.chmod(self.SOCKET_PATH, 0o600) return g.sensor_server
Upload File
Create Folder