D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
dedrads
/
Filename :
check_hacks
back
Copy
#!/usr/lib/rads/venv/bin/python3 from __future__ import annotations import argparse import datetime as dt import re import shutil import subprocess from collections import Counter from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Iterator, Optional, Sequence from rads import color # ---------------------------- # Models / configuration # ---------------------------- class ServerType(str, Enum): UBUNTU_CPANEL = "ubuntu+cpanel" ALMA_CPANEL = "alma+cpanel" ALMA_CWP = "alma+cwp" UNKNOWN = "unknown" @dataclass(frozen=True) class LogPaths: sys_log: Path mail_log: Path ssh_log: Path panel_login_log: Path domlogs_root: Path @dataclass(frozen=True) class ServerProfile: server_type: ServerType os_id: str os_pretty: str panel: str cpanel_version: Optional[str] cwp_version: Optional[str] paths: LogPaths # ---------------------------- # Low-level helpers # ---------------------------- #Small wrapper around subprocess to keep error-handling consistent. class Shell: @staticmethod def run(cmd: Sequence[str], timeout: int = 25) -> tuple[int, str, str]: try: p = subprocess.run( list(cmd), text=True, capture_output=True, timeout=timeout, check=False, ) return p.returncode, p.stdout, p.stderr except FileNotFoundError: return 127, "", color.red(f"Command not found: {cmd[0]}") except subprocess.TimeoutExpired: return 124, "", f"Timed out running: {' '.join(cmd)}" class FileIO: @staticmethod def iter_lines(path: Path) -> Iterator[str]: if not path.exists(): return with path.open("r", encoding="utf-8", errors="replace") as f: for line in f: yield line.rstrip("\n") @staticmethod def read_text(path: Path) -> Optional[str]: try: if not path.exists(): return None txt = path.read_text(errors="replace").strip() return txt or None except (OSError, UnicodeError): return None class Reporter: def __init__(self, width: int = 89) -> None: self._sep = color.blue("=") * width def bar(self) -> None: print(self._sep) def header(self, title: str) -> None: print() self.bar() print(title) self.bar() def warn(self, msg: str) -> None: print(f"[WARN] {msg}") def ok(self, msg: str) -> None: print(f"[OK] {msg}") @staticmethod def top(counter: Counter[str], n: int) -> list[tuple[str, int]]: return sorted(counter.items(), key=lambda kv: kv[1], reverse=True)[:n] def print_top(self, title: str, counter: Counter[str], n: int = 5, *, min_count: int = 1) -> None: items = [(k, v) for k, v in self.top(counter, n) if v >= min_count] print(title) self.bar() if not items: print(color.green("No results.")) return width = max(len(k) for k, _ in items) for k, v in items: print(f"{k:<{width}} {v}") # ---------------------------- # Detection # ---------------------------- class ServerDetector: """ Detect OS via /etc/os-release. Detect cPanel via /usr/local/cpanel/version. Detect CWP via /usr/local/cwpsrv/htdocs/resources/admin/include/version.php. """ def __init__(self) -> None: self._os_release = Path("/etc/os-release") self._cpanel_version_file = Path("/usr/local/cpanel/version") self._cwp_version_file = Path("/usr/local/cwpsrv/htdocs/resources/admin/include/version.php") self._cwp_version_re = re.compile(r"\b([0-9]+\.[0-9]+\.[0-9]+(?:\.[0-9]+)?)\b") def detect(self) -> ServerProfile: os_id, os_pretty = self._detect_os() cpanel_version = FileIO.read_text(self._cpanel_version_file) cwp_version = self._detect_cwp_version() panel = "none" if cwp_version: panel = "cwp" elif cpanel_version: panel = "cpanel" paths = self._select_paths(os_id=os_id, panel=panel) server_type = self._select_server_type(os_id=os_id, panel=panel) return ServerProfile( server_type=server_type, os_id=os_id or "unknown", os_pretty=os_pretty or (os_id or "Unknown OS"), panel=panel, cpanel_version=cpanel_version, cwp_version=cwp_version, paths=paths, ) def _detect_os(self) -> tuple[str, str]: data: dict[str, str] = {} if self._os_release.exists(): for line in FileIO.iter_lines(self._os_release): if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) data[k.strip()] = v.strip().strip('"') os_id = (data.get("ID") or "").lower() os_pretty = data.get("PRETTY_NAME") or data.get("NAME") or "" return os_id, os_pretty def _detect_cwp_version(self) -> Optional[str]: text = FileIO.read_text(self._cwp_version_file) if not text: return None m = self._cwp_version_re.search(text) return m.group(1) if m else None def _select_server_type(self, *, os_id: str, panel: str) -> ServerType: if panel == "cwp": return ServerType.ALMA_CWP if panel == "cpanel": if os_id == "ubuntu": return ServerType.UBUNTU_CPANEL if os_id in {"almalinux", "alma"}: return ServerType.ALMA_CPANEL return ServerType.UNKNOWN def _select_paths(self, *, os_id: str, panel: str) -> LogPaths: """Return log paths for the detected OS/panel combination.""" if panel == "cwp": return LogPaths( sys_log=Path("/var/log/messages"), mail_log=Path("/var/log/maillog"), ssh_log=Path("/var/log/secure"), panel_login_log=Path("/var/log/cwp_client_login.log"), domlogs_root=Path("/usr/local/apache/domlogs"), ) if panel == "cpanel": if os_id == "ubuntu": return LogPaths( sys_log=Path("/var/log/syslog"), mail_log=Path("/var/log/mail.log"), ssh_log=Path("/var/log/auth.log"), panel_login_log=Path("/usr/local/cpanel/logs/login_log"), domlogs_root=Path("/usr/local/apache/domlogs"), ) return LogPaths( sys_log=Path("/var/log/messages"), mail_log=Path("/var/log/maillog"), ssh_log=Path("/var/log/secure"), panel_login_log=Path("/usr/local/cpanel/logs/login_log"), domlogs_root=Path("/usr/local/apache/domlogs"), ) print(f"Error: No supported panel detected (OS: {os_id}, Panel: {panel}).") print("This script supports: cPanel on Ubuntu/AlmaLinux, CWP on AlmaLinux.") raise SystemExit(1) # ---------------------------- # Checks: processes & networking # ---------------------------- class ProcessInspector: def __init__(self, reporter: Reporter) -> None: self.r = reporter def _ps_rows(self) -> Iterator[tuple[str, str, str, str]]: rc, out, err = Shell.run(["ps", "-eo", "user:50=,pid=,ppid=,args="]) if rc != 0: self.r.warn(f"ps failed: {err.strip()}") return for line in out.splitlines(): parts = line.split(None, 3) if len(parts) < 3: continue user, pid, ppid = parts[0], parts[1], parts[2] args = parts[3] if len(parts) >= 4 else "" yield user, pid, ppid, args def run_all(self, *, full: bool) -> None: self._check_perl_processes() self._check_daemonized_user_processes() self._check_shell_bound_scripts() self._check_network_listeners(full=full) def _check_perl_processes(self) -> None: self.r.header(color.yellow("Check for masquerading perl processes (review before killing)")) pids: list[str] = [] for user, pid, _ppid, args in self._ps_rows() or []: if user == "nobody" or re.fullmatch(r"[a-z]+[0-9]+", user): if "perl" in args.lower(): pids.append(pid) if not pids: self.r.ok(color.green("No suspicious perl processes found")) return for pid in pids: stat_path = Path("/proc") / pid / "stat" cmdline_path = Path("/proc") / pid / "cmdline" try: stat = stat_path.read_text(errors="replace") comm = stat.split()[1].strip("()") cmdline_raw = cmdline_path.read_bytes() cmdline = cmdline_raw.replace(b"\x00", b" ").decode("utf-8", errors="replace").strip() except OSError: continue if comm == "perl" or " perl" in f" {cmdline}".lower(): print(f"PID={pid} COMM={comm} CMD={cmdline}") print() def _check_daemonized_user_processes(self) -> None: self.r.header(color.yellow("Daemonized user processes, review before terminating")) printed = False for user, pid, ppid, args in self._ps_rows() or []: if ppid == "1" and re.fullmatch(r"[a-z]+[0-9]+", user): if "gam_server" not in args: print(f"USER={user} PID={pid} PPID={ppid} ARGS={args}") printed = True if not printed: self.r.ok(color.green("No daemonized processes found")) print() def _check_shell_bound_scripts(self) -> None: self.r.header(color.yellow("Scripts bound to a shell review with: lsof -p PID")) hits = 0 for _user, pid, _ppid, args in self._ps_rows() or []: low = args.lower() if "/bin/sh" in low and "grep" not in low and "mysql" not in low: print(f"PID={pid} ARGS={args}") hits += 1 if "/bin/bash" in low and "grep" not in low and "/opt/" not in low and "check_hacks" not in low: print(f"PID={pid} ARGS={args}") hits += 1 if hits == 0: self.r.ok(color.green("No suspicious shell-bound scripts found")) print() def _check_network_listeners(self, *, full: bool) -> None: self.r.header(color.yellow("Network listeners")) if shutil.which("ss"): rc, out, err = Shell.run(["ss", "-plant"]) if rc != 0: self.r.warn(f"ss failed: {err.strip()}") return found_suspicious = False for l in out.splitlines(): if "perl" in l.lower(): print(l) found_suspicious = True if not found_suspicious: self.r.ok(color.green("No suspicious Network connections found")) if full: print() print("[--full] ss -plant (LISTEN/ESTAB)") self.r.bar() for l in out.splitlines(): if l.startswith("LISTEN") or l.startswith("ESTAB"): print(l) return rc, out, err = Shell.run(["netstat", "-plan"]) if rc != 0: self.r.warn(f"netstat failed: {err.strip()}") return found_suspicious = False for l in out.splitlines(): if "perl" in l.lower(): print(l) found_suspicious = True if not found_suspicious: self.r.ok(color.green("No suspicious Network connections found")) if full: print() print("[--full] netstat -plan (tcp lines with pid/program)") self.r.bar() for l in out.splitlines(): if l.strip().startswith("tcp") and re.search(r"\s[0-9]+/.*$", l): print(l) # ---------------------------- # Checks: logs # ---------------------------- class LogInspector: """ Parses: - SSH failures from auth/secure log - cPanel failed logins from binary login_log (strings -a) - Apache domlogs: suspicious paths + top IPs for today's date """ def __init__(self, profile: ServerProfile, reporter: Reporter) -> None: self.p = profile self.r = reporter self._ssh_fail_re_1 = re.compile(r"sshd\[\d+\]: Failed password .* from (?P<ip>\d{1,3}(?:\.\d{1,3}){3})\b") self._ssh_fail_re_2 = re.compile( r"sshd\[\d+\]: pam_unix\(sshd:auth\): authentication failure;.*\brhost=(?P<ip>\d{1,3}(?:\.\d{1,3}){3})\b" ) self._domlog_ip_re = re.compile( r"\s(?P<ip>\d{1,3}(?:\.\d{1,3}){3})\s+-\s+-\s+\[(?P<ts>\d{2}/[A-Za-z]{3}/\d{4}):" ) self._http_method_re = re.compile(r"\"(?P<method>[A-Z]+)\s+(?P<path>/\S*)\s+HTTP/[^\" ]+\"") self._suspicious_paths = [ re.compile(r"/wp-json/wp/v2/posts/\d+/", re.IGNORECASE), re.compile(r"/\.env\b", re.IGNORECASE), re.compile(r"/login\.asp\b", re.IGNORECASE), re.compile(r"(\bunion\b|\bselect\b|\border\b\s*by\b|%20union|%20select|%20order%20by)", re.IGNORECASE), ] def run_all(self) -> None: self._panel_failed_logins() self._ssh_failures() self._domlog_summary() def _panel_failed_logins(self) -> None: path = self.p.paths.panel_login_log if self.p.panel == "cpanel": self.r.header(color.yellow("cPanel failed authentication attempts")) if not path.exists(): self.r.warn(f"Missing: {path}") return if not shutil.which("strings"): self.r.warn("'strings' not found. Install (binutils) to parse cPanel login_log.") return counts = self._parse_cpanel_failed_logins(path) self.r.print_top(color.yellow("Top 5 IPS that FAILED LOGIN"), counts, n=5) return if self.p.panel == "cwp": self.r.header(color.yellow(f"CWP client logins preview from {path}")) if not path.exists(): self.r.warn(f"Missing: {path}") return lines = list(FileIO.iter_lines(path)) if not lines: print("No lines found.") return print("\n".join(lines[-200:])) return self.r.header("Panel login checks skipped (no panel detected)") def _parse_cpanel_failed_logins(self, login_log: Path) -> Counter[str]: counts: Counter[str] = Counter() if not login_log.exists() or not shutil.which("strings"): return counts rc, out, _err = Shell.run(["strings", "-a", str(login_log)], timeout=60) if rc != 0: return counts for line in out.splitlines(): if "FAILED LOGIN" not in line.upper(): continue parts = line.split() key = " ".join(parts[:3]) if len(parts) >= 3 else line.strip() if key: counts[key] += 1 return counts def _ssh_failures(self) -> None: path = self.p.paths.ssh_log self.r.header(color.yellow(f"SSH failures from {path} (top offenders > 5 failures)")) if not path.exists(): self.r.warn(f"Missing: {path}") return counts = self._parse_ssh_failures(path) filtered = Counter({ip: c for ip, c in counts.items() if c > 5}) self.r.print_top(color.yellow("Top 5 SSH offender IPs (failures > 5)"), filtered, n=5, min_count=6) def _parse_ssh_failures(self, path: Path) -> Counter[str]: counts: Counter[str] = Counter() for line in FileIO.iter_lines(path): if "sshd" not in line: continue m = self._ssh_fail_re_1.search(line) if m: counts[m.group("ip")] += 1 continue m = self._ssh_fail_re_2.search(line) if m: counts[m.group("ip")] += 1 return counts def _domlog_summary(self) -> None: root = self.p.paths.domlogs_root self.r.header(color.yellow("Apache domlogs: suspicious activity + top IPs for today's entries")) if not root.exists(): self.r.warn(f"Missing: {root}") return today_str = self._today_domlog_datestr() ip_counts_today, suspicious_hits = self._parse_domlogs(root, today_str) print(color.yellow(f"Top 5 domlog IPs for today ({today_str})")) self.r.bar() if ip_counts_today: for ip, c in self.r.top(ip_counts_today, 5): print(f"{ip:>15} {c}") else: print("No domlog entries counted for today. Verify that domlogs are logging correctly") print() print(color.yellow("Suspicious domlog hits")) self.r.bar() if suspicious_hits: for hit in suspicious_hits[:200]: print(hit) if len(suspicious_hits) > 200: print(f"... ({len(suspicious_hits) - 200} more)") else: print("No suspicious patterns found.") def _today_domlog_datestr(self, now: Optional[dt.datetime] = None) -> str: """Return today's Apache-style date string (e.g., 12/Jan/2026).""" if now is None: now = dt.datetime.now() return now.strftime("%d/%b/%Y") def _iter_domlog_files(self, root: Path, *, cutoff_ts: float) -> Iterator[Path]: Max_size = 100 * 1024 * 1024 def eligible(p: Path) -> bool: if not p.is_file(): return False try: stats = p.stat() if stats.st_size > Max_size: return False return stats.st_mtime >= cutoff_ts except OSError: return False for p in root.glob("*"): if eligible(p): yield p for p in root.glob("*/*"): if eligible(p): yield p def _parse_domlogs(self, root: Path, today_str: str) -> tuple[Counter[str], list[str]]: ip_counts_today: Counter[str] = Counter() suspicious_dedupe: set[tuple[str, str]] = set() suspicious_hits: list[str] = [] cutoff_ts = (dt.datetime.now().timestamp() - 24 * 60 * 60) for logfile in self._iter_domlog_files(root, cutoff_ts=cutoff_ts): try: for line in FileIO.iter_lines(logfile): m_ip = self._domlog_ip_re.search(line) if m_ip and m_ip.group("ts") == today_str: ip_counts_today[m_ip.group("ip")] += 1 m_req = self._http_method_re.search(line) if not m_req: continue method = m_req.group("method") path = m_req.group("path").lower() if method == "POST" and '"-"' in line: m_status = re.search(r"\"\s+(?P<status>\d{3})\s+", line) if m_status and m_status.group("status").startswith("2") and "cron" not in path: key = (str(logfile), path) if key not in suspicious_dedupe: suspicious_dedupe.add(key) suspicious_hits.append(f"No-ref-POST\t{logfile}\t{path}") for rx in self._suspicious_paths: if rx.search(path): key = (str(logfile), path) if key not in suspicious_dedupe: suspicious_dedupe.add(key) suspicious_hits.append(f"Suspicious\t{logfile}\t{path}") break except Exception: continue return ip_counts_today, suspicious_hits # ---------------------------- # Orchestration # ---------------------------- class HackCheckerApp: def __init__(self, profile: ServerProfile) -> None: self.profile = profile self.r = Reporter() self.proc = ProcessInspector(self.r) self.logs = LogInspector(self.profile, self.r) def run(self, *, full: bool) -> int: self._print_banner() self.proc.run_all(full=full) self.logs.run_all() print() return 0 def _print_banner(self) -> None: self.r.bar() print(color.magenta("= Review processes/logs before taking action. =")) self.r.bar() print(color.yellow(f"Detected OS: {self.profile.os_pretty} (ID={self.profile.os_id})")) if self.profile.panel == "cpanel": print(color.yellow(f"Detected panel: cPanel (version={self.profile.cpanel_version or 'unknown'})")) elif self.profile.panel == "cwp": print(color.yellow(f"Detected panel: CWP (version={self.profile.cwp_version or 'unknown'})")) else: print(color.yellow("Detected panel: none")) print(color.yellow(f"Profile: {self.profile.server_type.value}")) print(color.yellow("Log paths:")) print(color.yellow(f" sys_log = {self.profile.paths.sys_log}")) print(color.yellow(f" mail_log = {self.profile.paths.mail_log}")) print(color.yellow(f" ssh_log = {self.profile.paths.ssh_log}")) print(color.yellow(f" panel_login = {self.profile.paths.panel_login_log}")) print(color.yellow(f" domlogs_root = {self.profile.paths.domlogs_root}")) # ---------------------------- # Entry point # ---------------------------- def build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description="Checks logs and processes for common hack/malware indicators across Ubuntu/Alma + cPanel/CWP." ) p.add_argument("--full", action="store_true", help="Show extra network listener output where applicable.") return p def main() -> int: args = build_argparser().parse_args() detector = ServerDetector() profile = detector.detect() app = HackCheckerApp(profile) return app.run(full=args.full) if __name__ == "__main__": raise SystemExit(main())