D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
usr
/
lib
/
rads
/
venv
/
lib64
/
python3.13
/
site-packages
/
exim_analytics
/
Filename :
scraper.py
back
Copy
""" The purpose of this script is to preemptively detect abusive email behavior via analytics from exim logs We will never use this data for marketing or advertising purposes. We only use this data to help protect our customers and our network. """ import json import logging import os import sys from pathlib import Path from typing import Literal from rads import setup_logging from .api import send_batch, send_no_mail from . import config from . import Host from . import parser MAX_BATCH_SIZE_BYTES = 5 * 1024 * 1024 # 5MB def init_logging(): """Setup logging to /var/log/mail_analytics.log""" try: setup_logging( config.data.log_path, chmod=0o640, loglevel=config.data.log_level, print_out='stdout' if os.getenv('DEBUG') == '1' else None, ) return True except OSError as e: print(f"Failed to setup logging: {e}", file=sys.stderr) return False def read_and_delete_logfile(logfile: str, dry_run: bool = False): """Reads all records from logfile and deletes it (unless dry_run).""" if not os.path.exists(logfile): return skipped = 0 with open(logfile, "r", encoding="utf-8", errors="replace") as f: last_pos = 0 while (line := f.readline()) != "": pos = f.tell() try: rec = json.loads(line) yield rec, pos - last_pos except json.JSONDecodeError: logging.debug( "Skipping invalid JSON line in %s: %s", logfile, line ) skipped += 1 finally: last_pos = pos if skipped: logging.warning( "Skipping %s invalid JSON lines in %s", skipped, logfile ) if not dry_run: os.remove(logfile) def process_hosts( hosts: list[Host], platform: Literal["dedi", "vps"] = "dedi", dry_run: bool = False, ) -> int: """Process all hosts and return total records sent.""" total_sent = 0 batch_bytes = 0 buffer = {} batch_count = 0 for h in hosts: if not Path(h.offset_file_path).parent.exists(): Path(h.offset_file_path).parent.mkdir(parents=True, exist_ok=True) try: if not parser.parse_exim( OFFSET_FILE=h.offset_file_path, EXIM_LOG=h.exim_log_path, OUTPUT_LOG=h.parsed_log_path, ): logging.error( "Failed to parse exim log for host: %s", h.hostname ) continue except Exception: logging.exception( "Error parsing exim log for host: %s", h.hostname ) continue def flush(): nonlocal batch_count, total_sent, batch_bytes, buffer batch_count += 1 human_mb = f"{batch_bytes / (1024 * 1024):.2f}" count = sum(len(records) for records in buffer.values()) if dry_run: logging.info( "DRY RUN: Batch #%d: %d items, ~%s MB", batch_count, count, human_mb, ) total_sent += count else: logging.info( "Processing batch #%d: %d items, ~%s MB", batch_count, count, human_mb, ) if send_batch(buffer, platform): logging.info("Successfully sent batch #%d", batch_count) total_sent += count else: logging.error("Failed to send batch #%d", batch_count) raise RuntimeError(f"Failed to send batch #{batch_count}") buffer.clear() for host in hosts: logging.info( f"Processing host: {host.hostname} with log: {host.parsed_log_path}" ) try: has_mail = False for rec, byte_count in read_and_delete_logfile( host.parsed_log_path, dry_run=dry_run ): records = buffer.setdefault(host.hostname, []) records.append(rec) batch_bytes += byte_count has_mail = True if batch_bytes >= MAX_BATCH_SIZE_BYTES: flush() batch_bytes = 0 if not has_mail: logging.info("No mail records for host: %s", host.hostname) except Exception: logging.exception( "Error processing host %s", host.hostname ) if buffer: flush() return total_sent