Policyguard - Early-Reject Postfix Policy Spamfilter für Google & Co.

PolicyGuard — Intelligenter Postfix-Spamfilter mit automatischer Lernfunktion

PolicyGuard ist ein in Python geschriebener Policy-Daemon für den Mail Transfer Agent Postfix. Er schaltet sich in den SMTP-Dialog ein und entscheidet in Echtzeit — noch bevor eine E-Mail vollständig angenommen wird — ob eine eingehende Nachricht zugestellt, abgelehnt oder temporär zurückgestellt wird.

Wie es mit Postfix zusammenarbeitet

Postfix leitet bei jeder eingehenden Verbindung Metadaten an den Daemon weiter: Absenderadresse, Empfänger, IP-Adresse des sendenden Servers, HELO-Name und Client-Hostname. PolicyGuard wertet diese Daten aus und antwortet mit einer Postfix-Direktive — entweder dunno (durchlassen), reject (sofort ablehnen) oder defer (temporär zurückstellen). Die eigentliche E-Mail wird dabei gar nicht erst übertragen, was den Server-Ressourcenverbrauch minimal hält.

Mehrstufige Filterkaskade

Jede eingehende E-Mail durchläuft mehrere Prüfebenen in fester Reihenfolge:

Empfänger-Blocklist — bestimmte Zieladressen werden pauschal abgewiesen

Client-Hostname-Blocklist — bekannte Spam-Infrastrukturen (z. B. google-usercontent) werden direkt geblockt

Regex-Filter — verdächtige Absender-Muster, typisch für Google-Groups-Missbrauch, werden erkannt

SPF-Prüfung — der sendende Server wird gegen den SPF-Eintrag der Absender-Domain geprüft; bei fail wird die Mail abgelehnt, bei Google-HELO zusätzlich softfail und permerror

Firebase-Erkennung — Domains, die Firebase-Mailversand nutzen, werden über SPF-Includes und TXT-Records erkannt und geblockt

Hunter.io-Erkennung — Domains, die im Abuse-Verzeichnis von Hunter.io gelistet sind, werden blockiert

Unbekannte Clients — Server ohne rDNS erhalten ein temporäres defer 450, was legitime Mailserver automatisch wiederholen lässt, Spam-Schleudern jedoch oft nicht

Das lernende Kernelement: die Auto-Blocklist

Das intelligenteste Feature ist die automatische Blocklist, die sich aus dem eigenen Mail-Journal aufbaut. Das Programm liest fortlaufend die Postfix-Logs via journalctl aus und zählt pro Absender-Domain, wie viele E-Mails blockiert versus akzeptiert wurden. Überschreitet eine Domain den konfigurierbaren Schwellenwert — standardmäßig 76% Blockierrate bei mindestens 4 Mails in den letzten 50 Tagen — wird sie automatisch zur Blocklist hinzugefügt. Das System lernt also aus dem eigenen Filterverhalten, ohne manuelles Eingreifen.

Effizienz durch Caching und inkrementelles Logging

Um bei jedem eingehenden Mail nicht neu rechnen zu müssen, hält PolicyGuard sämtliche Daten im RAM: die Auto-Blocklist, SPF-Auflösungen, DNS-TXT-Abfragen und Hunter.io-Ergebnisse werden gecacht. Die Journal-Daten vergangener Tage werden in einer lokalen JSON-Datei persistiert und nur einmalig eingelesen. Während des Betriebs liest das Programm lediglich inkrementell neue Journal-Einträge ein — höchstens alle 30 Sekunden — und aktualisiert die Blocklist bei Bedarf im Hintergrund. Über eine einfache Textschnittstelle lassen sich per stats- und blocklist-Kommando jederzeit Einblicke in das Filtergeschehen abrufen.


Installation unter Proxmox Mail-Gateway

Voraussetzungen

PolicyGuard läuft als Postfix Policy-Daemon unter Proxmox Mail Gateway (PMG). Für die Installation werden Root-Rechte benötigt.

1. Python-Abhängigkeiten installieren

apt install python3-spf python3-dnspython

2. Skript installieren

Das Skript nach /usr/local/bin/ kopieren und ausführbar machen:

cp policyguard.py /usr/local/bin/policyguard.py
chmod +x /usr/local/bin/policyguard.py

3. Arbeitsverzeichnis anlegen

PolicyGuard benötigt /t/ für den Journal-Cursor und den Cache:

mkdir -p /t/
chmod 777 /t/

4. Journal-Zugriff für nobody erlauben

PolicyGuard läuft unter dem Benutzer nobody und benötigt Lesezugriff auf das systemd-Journal. Dazu eine sudo-Regel anlegen:

visudo -f /etc/sudoers.d/policyguard

Folgenden Inhalt eintragen und speichern:

nobody ALL=(root) NOPASSWD: /usr/bin/journalctl

5. PMG-Konfigurationstemplates vorbereiten

Damit Änderungen an der Postfix-Konfiguration PMG-Updates überstehen, müssen die Templates in das Override-Verzeichnis kopiert werden — sofern noch nicht geschehen:

mkdir -p /etc/pmg/templates/
cp /var/lib/pmg/templates/master.cf.in /etc/pmg/templates/
cp /var/lib/pmg/templates/main.cf.in /etc/pmg/templates/

6. Postfix konfigurieren
main.cf.in — Sender-Restriktionen erweitern

In /etc/pmg/templates/main.cf.in den Policy-Dienst zu den smtpd_sender_restrictions hinzufügen:

smtpd_sender_restrictions =
                   check_policy_service unix:private/policygrd

Bestehende Restriktionen bleiben erhalten — die neue Zeile einfach in die vorhandene Liste einreihen.

master.cf.in — Daemon-Eintrag ergänzen

Am Ende von /etc/pmg/templates/master.cf.in folgende zwei Zeilen einfügen:

policygrd unix - n n - 0 spawn
    user=nobody argv=/usr/local/bin/policyguard.py

7. Konfiguration übernehmen

Nach dem Bearbeiten der Templates die PMG-Konfiguration neu generieren und Postfix neu laden:

pmgconfig sync --restart 1

Überprüfung und Test

apt install socat
socat - UNIX-CONNECT:/var/spool/postfix/private/policygrd

blocklist domo
==== BEGIN CURRENT BLOCKLIST ====
domointelligence.com
==== END CURRENT BLOCKLIST ====
stats domo
==== TOTAL DOMAIN STATS (all days + today) ====
  domointelligence.com           accept: 4  blocked: 31  total: 35  percent blocked: 88 %
==== END TOTAL DOMAIN STATS ====
stats 60 80
==== TOTAL DOMAIN STATS (all days + today) ====
  texbangla.com                  accept: 6  blocked: 24  total: 30  percent blocked: 80 %
  dachser.com                    accept: 8  blocked: 29  total: 37  percent blocked: 78 %
  rsgsv.net                      accept: 5  blocked: 18  total: 23  percent blocked: 78 %
  premium-box.eu                 accept: 11  blocked: 34  total: 45  percent blocked: 75 %
  hipalanet.com                  accept: 6  blocked: 18  total: 24  percent blocked: 75 %
  dubaidutyfreetennischampionships.com accept: 2  blocked: 6  total: 8  percent blocked: 75 %
  bobsbonsai.com                 accept: 6  blocked: 16  total: 22  percent blocked: 72 %
  momentstudio.ca                accept: 7  blocked: 18  total: 25  percent blocked: 72 %
  table.media                    accept: 2  blocked: 5  total: 7  percent blocked: 71 %
  carvertical.com                accept: 2  blocked: 4  total: 6  percent blocked: 66 %
  andia-international.com        accept: 7  blocked: 14  total: 21  percent blocked: 66 %
  mandrillapp.com                accept: 2  blocked: 4  total: 6  percent blocked: 66 %
  o2lenses.com                   accept: 3  blocked: 6  total: 9  percent blocked: 66 %
  equinestar.net                 accept: 6  blocked: 11  total: 17  percent blocked: 64 %
==== END TOTAL DOMAIN STATS ====

 

Quellcode:

#!/usr/bin/env python3
import sys, re, dns.resolver, os, subprocess, json, requests, spf, time
from datetime import datetime, date, timedelta, timezone
from pathlib import Path
from collections import defaultdict
from typing import Iterator, Optional

# --- Whitelist / Exclude Domains ---
BLOCKLIST_BAN = {
    "amazon.com", "gmail.com", "google.com", "googlemail.com", "mail.com",
    "amazonses.com", "hotmail.com", "outlook.com", "ebay.com", "microsoft.com",
    "mailjet.com", "apple.com", "protonmail.com", "crsend.com", 
    "onmicrosoft.com", "facebook.com",
}

BLOCKLIST_SPF = {"_spf.firebasemail.com"}
BLOCKLIST_TXT_PREFIX = {"firebase="}
BLOCKLIST_SENDER = {"firebaseapp.com", "maestro.bounces.google.com"}
WHITELIST_CLIENT_SPF = {""}
# Blockierte Empfänger (exakte Adressen oder @domain)
BLOCKLIST_RECIPIENT = { "" }
BLOCKLIST_CLIENTNAME = { "googleusercontent.com" }
BLOCKLIST_REGEX = [
    r'^[a-z0-9]{1,3}\+bncB[A-Z0-9]{20,}@',
    r'^[^@]+@s\.[a-zA-Z0-9.-]+$'
]
UNTRUSTED_HELO = "google.com"
WHITELIST_HELO = ""

# --- Globale Konstanten & Regex ---
MAX_SPF_DEPTH = 10
SPF_CACHE_SIZE = 4096
# Limit für sonstige DNS/HTTP Caches um Memory Leaks zu vermeiden
GENERIC_CACHE_SIZE = 4096 

DAYS = int(os.getenv("DAYS", 50))
MIN_COUNT = int(os.getenv("MIN_COUNT", 4))
BLOCK_QUOTA = int(os.getenv("BLOCK_QUOTA", 76))
# Pre-computed Block Quota for faster division checks
BLOCK_QUOTA_FLOAT = BLOCK_QUOTA / 100.0

TRANSPORTMAP = os.getenv("TRANSPORTMAP", "/etc/pmg/transport")
JOURNAL_CURSOR_FILE = Path(os.getenv("JOURNAL_CURSOR_FILE", "/t/postfix.cursor"))
CACHE_FILE = Path(os.getenv("CACHE_FILE", "/t/postfix_cache.json"))

resolver = dns.resolver.Resolver()
resolver.timeout = 2
resolver.lifetime = 3

# --- Single Source of Truth für Journal-Filter-Tokens ---
# Diese Tokens werden an drei Stellen benötigt:
#   1. BLOCK_RE / ACCEPT_RE      -> finale Regex-Klassifizierung der Message
#   2. _PREFILTER_TOKENS         -> billiger Substring-Pre-Filter vor json.loads()
#   3. _JOURNAL_GREP             -> server-seitiger Filter via journalctl --grep
#
# Pro Eintrag definieren wir:
#   - prefilter_token: die Substring-Form, die im Logtext als Indikator reicht
#   - regex_pattern:   die genauere Form für BLOCK_RE/ACCEPT_RE (z.B. mit ":"
#                      oder Präfix "rejected: ")
# Die Trennung ist nötig, weil das Original den Pre-Filter laxer hält als die
# finale Regex (z.B. "Domain not found" als Pre-Filter, aber "rejected: Domain
# not found" als Regex). Diese Asymmetrie wird hier 1:1 erhalten.
_BLOCK_TOKENS: tuple[tuple[str, str], ...] = (
    ("proxy-reject",     r"proxy-reject:"),
    ("policyguard-500",  r"policyguard-500"),
    ("Domain not found", r"rejected: Domain not found"),
)
_ACCEPT_TOKENS: tuple[tuple[str, str], ...] = (
    ("proxy-accept", r"proxy-accept:"),
)

BLOCK_RE  = re.compile("|".join(p for _, p in _BLOCK_TOKENS),  re.IGNORECASE)
ACCEPT_RE = re.compile("|".join(p for _, p in _ACCEPT_TOKENS), re.IGNORECASE)
BLOCKLIST_REGEX_COMPILED = [re.compile(p) for p in BLOCKLIST_REGEX]

# Throttle für update_today_incremental (Sekunden zwischen journalctl-Aufrufen)
INCREMENTAL_UPDATE_INTERVAL = int(os.getenv("INCREMENTAL_UPDATE_INTERVAL", 30))

# Log-Rate-Limit: maximal so viele Log-Zeilen pro Sekunde, danach gedroppt
LOG_RATE_LIMIT_PER_SEC = int(os.getenv("LOG_RATE_LIMIT_PER_SEC", 200))

# Erst-Import: nach so vielen verarbeiteten Tagen wird der Cache zwischengespeichert
INITIAL_IMPORT_FLUSH_EVERY_DAYS = int(os.getenv("INITIAL_IMPORT_FLUSH_EVERY_DAYS", 5))

# --- State ---
CACHE: dict = {}
STATS: dict = {}
SPF_CACHE: dict = {}
SPF_BLOCK_CACHE: dict[str, bool] = {}
HUNTER_CACHE: dict[str, bool] = {}
TXT_PREFIX_CACHE: dict[str, bool] = {}

_blocklist_cache: set[str] = set()
_blocklist_dirty: bool = True
_last_incremental_update: float = 0.0

# Log-Rate-Limit-State
_log_window_start: float = 0.0
_log_window_count: int = 0
_log_dropped_count: int = 0

# --- Helfer für Pre-Computation ---
def base_domain(dom: str) -> str:
    parts = dom.split(".")
    if len(parts) < 2:
        return dom
    tld = parts[-1]
    sld = parts[-2]
    if (tld in {"uk", "au", "nz", "za", "br", "jp"} and 
        sld in {"co", "com", "net", "org", "gov", "ac", "edu"} and 
        len(parts) >= 3):
        return ".".join(parts[-3:])
    return ".".join(parts[-2:])

def normalize_domain(dom: str) -> str:
    return base_domain(dom.lower().rstrip("."))

# Vorab normalisierte Sets für O(1) Lookups
BLOCKLIST_BAN_NORMALIZED = {normalize_domain(d) for d in BLOCKLIST_BAN}

def domain_matches_list(domain: str, target_set: set[str]) -> bool:
    """O(1) Check für exakte Matches oder Subdomains statt O(n) String-Endswith."""
    if domain in target_set:
        return True
    parts = domain.split('.')
    for i in range(1, len(parts)):
        if '.'.join(parts[i:]) in target_set:
            return True
    return False

def ensure_cursor_dir():
    JOURNAL_CURSOR_FILE.parent.mkdir(parents=True, exist_ok=True)

def load_transport_domains() -> set[str]:
    domains = set()
    transport_path = Path(TRANSPORTMAP)
    if not transport_path.is_file():
        return domains
    with open(transport_path, "r", errors="ignore") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            key = line.split()[0].strip("[]").lower().rstrip(".")
            domains.add(normalize_domain(key))
    return domains

# ---------------------------------------------------------------------------
# Print Stats (Optimized)
# ---------------------------------------------------------------------------

def stats(search_term: str = "", min_pct: int = 0, max_pct: int = 100, only_blocked: bool = False):
    transport_excludes = load_transport_domains()
    all_excludes = BLOCKLIST_BAN_NORMALIZED | transport_excludes

    # defaultdict ist signifikant schneller
    agg = defaultdict(lambda: {"total": 0, "blocked": 0, "accepted": 0})
    
    for day_data in [*CACHE.values(), STATS]:
        for dom, s in day_data.items():
            if dom in all_excludes or dom.endswith(".de"):
                continue
            e = agg[dom]
            e["total"]    += s["total"]
            e["blocked"]  += s["blocked"]
            e["accepted"] += s["accepted"]

    filtered = []
    for dom, s in agg.items():
        if s["total"] == 0 or s["blocked"] == 0:
            continue
        if search_term and search_term not in dom:
            continue
            
        pct = int(s["blocked"] / s["total"] * 100)
        if min_pct <= pct <= max_pct:
            filtered.append((dom, s, pct))

    if not filtered:
        print("No stats available.")
        sys.stdout.flush()
        return

    # Sortieren greift direkt auf den vorberechneten pct-Wert (x[2]) zu
    filtered.sort(key=lambda x: (x[2], x[1]["total"]), reverse=True)
    
    print("==== TOTAL DOMAIN STATS (all days + today) ====")
    for dom, s, pct in filtered:
        meets_block_criteria = (s["total"] >= MIN_COUNT) and (pct >= BLOCK_QUOTA)

        if only_blocked and not meets_block_criteria:
            continue

        marker = f">>{pct}%<<" if meets_block_criteria else f"  {pct}%  "
        print(f"{dom:<40} accept: {s['accepted']:<5} blocked: {s['blocked']:<5} total: {s['total']:<5}  {marker:<10}")

    print("==== END TOTAL DOMAIN STATS ====")
    sys.stdout.flush()

# ---------------------------------------------------------------------------
# Logging & Extraction
# ---------------------------------------------------------------------------
FROM_REGEX = re.compile(r"from=<[^>]*@([^> ,]+)>")

def extract_domain(message: str) -> str:
    m = FROM_REGEX.search(message)
    if m:
        return normalize_domain(m.group(1))
    return ""

def log(msg: str):
    """Schreibt nach stderr mit Rate-Limit, um Pipe-Blocking zu vermeiden."""
    global _log_window_start, _log_window_count, _log_dropped_count
    now = time.monotonic()
    if now - _log_window_start >= 1.0:
        # Neue Sekunde: ggf. Dropped-Counter ausgeben, dann Reset
        if _log_dropped_count > 0:
            try:
                sys.stderr.write(f"spf-policy: [rate-limit] {_log_dropped_count} log lines dropped\n")
                sys.stderr.flush()
            except Exception:
                pass
            _log_dropped_count = 0
        _log_window_start = now
        _log_window_count = 0

    if _log_window_count >= LOG_RATE_LIMIT_PER_SEC:
        _log_dropped_count += 1
        return

    _log_window_count += 1
    try:
        sys.stderr.write(f"spf-policy: {msg}\n")
        sys.stderr.flush()
    except Exception:
        pass

# ---------------------------------------------------------------------------
# Cache persistence
# ---------------------------------------------------------------------------

def save_cache():
    CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
    today_str = date.today().isoformat()
    cutoff = (date.today() - timedelta(days=DAYS)).isoformat()

    pruned = {
        day: data for day, data in CACHE.items()
        if day != today_str and day >= cutoff
    }

    tmp = CACHE_FILE.with_suffix(".tmp")
    try:
        with open(tmp, "w") as f:
            json.dump(pruned, f)
        tmp.replace(CACHE_FILE)
    except Exception as e:
        log(f"save_cache error: {e}")

def load_cache():
    global CACHE
    if not CACHE_FILE.exists():
        return
    try:
        with open(CACHE_FILE) as f:
            data = json.load(f)
        today_str = date.today().isoformat()
        cutoff = (date.today() - timedelta(days=DAYS)).isoformat()
        CACHE = {
            day: stats for day, stats in data.items()
            if day != today_str and day >= cutoff
        }
        log(f"load_cache: {len(CACHE)} days loaded from {CACHE_FILE}")
    except Exception as e:
        log(f"load_cache error: {e}")

# ---------------------------------------------------------------------------
# Log reading

# Pre-Filter-Tokens und --grep-Pattern werden zentral aus _BLOCK_TOKENS /
# _ACCEPT_TOKENS oben abgeleitet -> keine drei Stellen mehr synchron zu halten.
_PREFILTER_TOKENS: tuple[str, ...] = tuple(
    t for t, _ in (*_BLOCK_TOKENS, *_ACCEPT_TOKENS)
)

# --grep-Pattern (regex, ERE) - läuft serverseitig in journalctl.
# Reduziert die transferierte Datenmenge massiv bei stark verrauschten Logs.
_JOURNAL_GREP: str = "|".join(_PREFILTER_TOKENS)


def _journalctl_cmd(extra_args: list[str]) -> list[str]:
    """Baut den journalctl-Aufruf. Setzt voraus, dass der User Mitglied der
    Gruppe systemd-journal ist (siehe Hinweis am Dateianfang). Falls sudo
    zwingend benötigt wird, kann man hier ['sudo', '-n', ...] voranstellen."""
    return [
        "sudo", "journalctl", "--no-pager", "-u", "postfix", "-o", "json",
        "--output-fields=MESSAGE,__REALTIME_TIMESTAMP",
        "--grep", _JOURNAL_GREP,
    ] + extra_args


def _iter_journal_events(extra_args: list[str]) -> Iterator[tuple[str, str, bool, bool]]:
    """Generator über (line_date, domain, is_blocked, is_accepted) Tupel.
    Streamt journalctl zeilenweise und hält keinen großen Buffer im RAM.
    Schluckt Fehler und loggt einmalig.
    """
    # Lokale Bindings für Performance im Hot-Path
    block_search = BLOCK_RE.search
    accept_search = ACCEPT_RE.search
    json_loads = json.loads
    fromtimestamp = datetime.fromtimestamp
    _extract_domain = extract_domain
    prefilter_tokens = _PREFILTER_TOKENS

    proc = None
    try:
        proc = subprocess.Popen(
            _journalctl_cmd(extra_args),
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
            text=True,
            bufsize=1 << 16,  # 64 KiB Pipe-Buffer
        )

        for raw in proc.stdout:
            # Schneller Pre-Filter VOR json.loads (Substring ist ~50x schneller).
            # journalctl --grep filtert serverseitig, aber falls --grep mal
            # entfernt wird, bleibt diese Zeile als Sicherheitsnetz.
            if not any(tok in raw for tok in prefilter_tokens):
                continue
            line = raw.strip()
            if not line:
                continue
            try:
                entry = json_loads(line)
            except json.JSONDecodeError:
                continue

            message = entry.get("MESSAGE", "")
            if not isinstance(message, str):
                continue

            is_blocked = bool(block_search(message))
            is_accepted = bool(accept_search(message)) and not is_blocked
            if not is_blocked and not is_accepted:
                continue

            dom = _extract_domain(message)
            if not dom:
                continue

            ts = entry.get("__REALTIME_TIMESTAMP")
            if not ts:
                continue
            try:
                # UTC-bewusst: Tagesgrenzen sind sonst zeitzonenabhängig.
                dt = fromtimestamp(int(ts) / 1_000_000, tz=timezone.utc)
                line_date = dt.date().isoformat()
            except (ValueError, TypeError, OSError):
                continue

            yield line_date, dom, is_blocked, is_accepted

        proc.stdout.close()
        proc.wait(timeout=120)
    except subprocess.TimeoutExpired:
        if proc is not None:
            proc.kill()
        log("_iter_journal_events: journalctl timed out")
    except Exception as e:
        log(f"_iter_journal_events error: {e}")
    finally:
        if proc is not None and proc.poll() is None:
            try:
                proc.kill()
            except Exception:
                pass


def _iter_journal_events_after_cursor() -> Iterator[tuple[str, str, bool, bool]]:
    """Wie _iter_journal_events, aber mit Cursor-File. Wird für inkrementelle
    Updates während des laufenden Betriebs genutzt — typischerweise sehr wenig
    Output, daher kein Streaming-Overhead nötig."""
    return _iter_journal_events([f"--cursor-file={JOURNAL_CURSOR_FILE}"])


def _bump(day_dict: dict, dom: str, is_blocked: bool, is_accepted: bool):
    """Inkrementiert total/blocked/accepted für eine Domain in einem Tagesdict."""
    e = day_dict.get(dom)
    if e is None:
        e = {"total": 0, "blocked": 0, "accepted": 0}
        day_dict[dom] = e
    e["total"] += 1
    if is_blocked:
        e["blocked"] += 1
    if is_accepted:
        e["accepted"] += 1


# ---------------------------------------------------------------------------
# Cache initialization
# ---------------------------------------------------------------------------

def initialize_cache():
    """RAM-schonender Erst-Import:
       - Ein einziger journalctl-Stream für alle fehlenden Tage gemeinsam.
       - Tagesweises Flushen in CACHE: sobald der Datums-Wechsel erkannt wird,
         ist der vorherige Tag fertig und kann persistiert werden — der RAM
         hält immer nur einen aktiven Tag.
       - Periodisches save_cache() nach N fertigen Tagen, damit ein Crash
         während Erst-Import nicht alle Fortschritte verliert.
    """
    global CACHE, STATS, _blocklist_dirty, _last_incremental_update

    load_cache()
    today = date.today()
    today_str = today.isoformat()

    needed_days = {
        (today - timedelta(days=i)).isoformat()
        for i in range(1, DAYS + 1)
    } - set(CACHE.keys())

    if needed_days:
        oldest = min(needed_days)
        log(f"initialize_cache: need {len(needed_days)} days, streaming from {oldest}")

        # Streaming-Strategie: aktuellen Tag im RAM halten, beim Wechsel flushen.
        current_day: Optional[str] = None
        current_day_data: dict = {}
        days_done_since_flush = 0

        # Tag-Set für O(1) Lookup beim Filtern (wir streamen ggf. auch
        # Tage, die schon im CACHE sind — die werden hier verworfen).
        needed_days_set = needed_days

        # Wir nehmen einen großen Stream "since oldest" — journalctl wird die
        # Tage in chronologischer Reihenfolge liefern, daher reicht ein
        # einfacher Tageswechsel-Detektor.
        for line_date, dom, is_blocked, is_accepted in _iter_journal_events(["--since", oldest]):
            # Tage außerhalb des needed-Fensters überspringen
            if line_date == today_str:
                # Heutige Daten kommen weiter unten via STATS rein
                continue
            if line_date not in needed_days_set:
                continue

            if current_day is None:
                current_day = line_date
                current_day_data = {}
            elif line_date != current_day:
                # Tag fertig — flushen
                CACHE[current_day] = current_day_data
                days_done_since_flush += 1
                current_day_data = {}
                current_day = line_date

                if days_done_since_flush >= INITIAL_IMPORT_FLUSH_EVERY_DAYS:
                    save_cache()
                    days_done_since_flush = 0

            _bump(current_day_data, dom, is_blocked, is_accepted)

        # Letzten Tag auch flushen
        if current_day is not None:
            CACHE[current_day] = current_day_data
            current_day_data = {}  # RAM freigeben

        # Tage ohne Events trotzdem als leer markieren, sonst werden sie beim
        # nächsten Start wieder importiert
        for d in needed_days_set:
            if d not in CACHE:
                CACHE[d] = {}

        save_cache()
    else:
        log("initialize_cache: all historical days present in cache file")

    # Heutige Stats separat einlesen (kleiner Datensatz)
    log("initialize_cache: loading today's stats from journal")
    today_data: dict = {}
    for line_date, dom, is_blocked, is_accepted in _iter_journal_events(["--since", today_str]):
        if line_date != today_str:
            continue
        _bump(today_data, dom, is_blocked, is_accepted)
    STATS = today_data

    ensure_cursor_dir()
    # Cursor-File initialisieren, indem wir einmal "leer" durchlaufen
    # (oder: alle bereits gelesenen Events konsumieren)
    for _ in _iter_journal_events_after_cursor():
        pass

    _blocklist_dirty = True
    _last_incremental_update = time.monotonic()


def update_today_incremental():
    """Inkrementelles Update — wird bei jedem Request via current_blocklist()
    aufgerufen, ist aber per Throttle auf INCREMENTAL_UPDATE_INTERVAL
    Sekunden begrenzt, um nicht bei jeder Mail einen subprocess zu spawnen.
    """
    global STATS, CACHE, _blocklist_dirty, _last_incremental_update

    now = time.monotonic()
    if now - _last_incremental_update < INCREMENTAL_UPDATE_INTERVAL:
        return
    _last_incremental_update = now

    ensure_cursor_dir()

    today_str = date.today().isoformat()
    cache_updated = False
    new_stats_seen = False

    for line_date, dom, is_blocked, is_accepted in _iter_journal_events_after_cursor():
        if line_date == today_str:
            _bump(STATS, dom, is_blocked, is_accepted)
            new_stats_seen = True
        else:
            # Vergangener Tag — nur eintragen, wenn noch nicht im CACHE
            # (verhindert Doppelzählung bei Tagesübergang).
            if line_date not in CACHE:
                CACHE[line_date] = {}
                cache_updated = True
                _bump(CACHE[line_date], dom, is_blocked, is_accepted)
            elif not CACHE[line_date]:
                _bump(CACHE[line_date], dom, is_blocked, is_accepted)
                cache_updated = True

    if cache_updated:
        save_cache()

    if new_stats_seen or cache_updated:
        _blocklist_dirty = True

# ---------------------------------------------------------------------------
# Candidate detection (Optimized)
# ---------------------------------------------------------------------------

def find_candidates(cache: dict, stats: dict) -> list[tuple[str, int]]:
    total = defaultdict(int)
    blocked = defaultdict(int)

    for day_data in [*cache.values(), stats]:
        for dom, s in day_data.items():
            total[dom] += s["total"]
            blocked[dom] += s["blocked"]

    candidates = [
        (dom, tot)
        for dom, tot in total.items()
        if tot >= MIN_COUNT and (blocked[dom] / tot) > BLOCK_QUOTA_FLOAT
    ]

    return sorted(candidates, key=lambda x: x[1], reverse=True)

# ---------------------------------------------------------------------------
# Autoblocklist
# ---------------------------------------------------------------------------

def current_blocklist() -> set[str]:
    global _blocklist_cache, _blocklist_dirty

    update_today_incremental()

    if not _blocklist_dirty:
        return _blocklist_cache

    transport_excludes = load_transport_domains()
    all_excludes = BLOCKLIST_BAN_NORMALIZED | transport_excludes

    candidates = find_candidates(CACHE, STATS)
    
    # Als O(1) Set statt Liste speichern
    _blocklist_cache = {
        dom for dom, _ in candidates
        if not dom.endswith(".de") and dom not in all_excludes
    }

    _blocklist_dirty = False
    return _blocklist_cache

# ---------------------------------------------------------------------------
# Hunter IO (mit RAM Cache)
# ---------------------------------------------------------------------------

def check_for_hunter_io(url: str) -> bool:
    global HUNTER_CACHE
    if url in HUNTER_CACHE:
        return HUNTER_CACHE[url]
        
    headers = {"User-Agent": "Mozilla/5.0 (compatible; Python-Checker/1.0)"}
    abusestring = "Diese E-Mail-Adresse ist vor Spambots geschützt! Zur Anzeige muss JavaScript eingeschaltet sein."
    result = False

    try:
        response = requests.get(f'https://{url}', headers=headers, timeout=3, allow_redirects=False)
        response.raise_for_status()
        result = abusestring in response.text
    except Exception:
        pass

    if len(HUNTER_CACHE) < GENERIC_CACHE_SIZE:
        HUNTER_CACHE[url] = result
    return result

# ---------------------------------------------------------------------------
# SPF & TXT Check (mit RAM Cache)
# ---------------------------------------------------------------------------

def txt_contains_prefix(domain: str) -> bool:
    global TXT_PREFIX_CACHE
    if domain in TXT_PREFIX_CACHE:
        return TXT_PREFIX_CACHE[domain]
        
    result = False
    try:
        answers = resolver.resolve(domain, "TXT")
        for r in answers:
            txt = "".join([s.decode() for s in r.strings])
            if any(txt.startswith(prefix) for prefix in BLOCKLIST_TXT_PREFIX):
                result = True
                break
    except Exception:
        pass
        
    if len(TXT_PREFIX_CACHE) < GENERIC_CACHE_SIZE:
        TXT_PREFIX_CACHE[domain] = result
    return result

def get_spf(domain: str) -> str | None:
    global SPF_CACHE
    if domain in SPF_CACHE:
        return SPF_CACHE[domain]
        
    result = None
    try:
        answers = resolver.resolve(domain, "TXT")
        for r in answers:
            txt = "".join([s.decode() for s in r.strings])
            if txt.startswith("v=spf1"):
                result = txt
                break
    except Exception:
        pass
        
    if len(SPF_CACHE) < SPF_CACHE_SIZE:
        SPF_CACHE[domain] = result
    return result

def get_all_spf_includes(domain: str, depth: int = 0, visited: set | None = None) -> set:
    if visited is None:
        visited = set()
    includes = set()

    if depth > MAX_SPF_DEPTH or domain in visited:
        return includes

    visited.add(domain)
    spf_record = get_spf(domain)
    if not spf_record:
        return includes

    for part in spf_record.split():
        if part.startswith("include:"):
            include = part.split(":", 1)[1].lower()
            includes.add(include)
            includes.update(get_all_spf_includes(include, depth + 1, visited))

    return includes

def spf_contains_block(domain: str) -> bool:
    """Cached: das Set-Building ist teuer, aber Domain-stabil."""
    cached = SPF_BLOCK_CACHE.get(domain)
    if cached is not None:
        return cached

    includes = get_all_spf_includes(domain)
    blocklist_lower = {b.lower() for b in BLOCKLIST_SPF}
    result = bool(includes & blocklist_lower)

    if len(SPF_BLOCK_CACHE) < GENERIC_CACHE_SIZE:
        SPF_BLOCK_CACHE[domain] = result
    return result

def sender_matches_regex(sender: str) -> tuple[bool, str | None]:
    for regex in BLOCKLIST_REGEX_COMPILED:
        if regex.match(sender):
            return True, regex.pattern
    return False, None

# ---------------------------------------------------------------------------
# Request handling (Optimized O(1) lookups)
# ---------------------------------------------------------------------------

def parse_search_args(arg: str) -> tuple[str, int, int, bool]:
    """Parsed `stats`-Argumente.
    Konvention:
      - 0 Zahlen        -> min=0,    max=100
      - 1 Zahl  N       -> min=N,    max=100   (»mindestens N%«)
      - 2+ Zahlen A B   -> min=min(A,B), max=max(A,B)
      - 'b' / 'blocked' -> only_blocked = True
    """
    parts = arg.strip().split()
    search_term = ""
    numbers: list[int] = []
    only_blocked = False
    for p in parts:
        p_lower = p.lower()
        if p_lower in ("-b", "blocked"):
            only_blocked = True
        elif p.isdigit() and 0 <= int(p) <= 100:
            numbers.append(int(p))
        else:
            search_term = p_lower

    if len(numbers) == 0:
        min_pct, max_pct = 0, 100
    elif len(numbers) == 1:
        min_pct, max_pct = numbers[0], 100
    else:
        min_pct, max_pct = min(numbers[0], numbers[1]), max(numbers[0], numbers[1])

    return search_term, min_pct, max_pct, only_blocked

def handle_request(attrs: dict) -> str:
    sender = attrs.get("sender", "")
    recipient   = attrs.get("recipient", "").lower()   # NEU
    client_name = attrs.get("client_name", "").lower()
    helo_name = attrs.get("helo_name", "").lower()
    client_address = attrs.get("client_address", "")

    # HELO Whitelist
    if helo_name == WHITELIST_HELO or helo_name.endswith("." + WHITELIST_HELO):
        log(f"whitelist sender by whitelist")
        return "dunno"
                
    if recipient:
        # BLOCKLIST_RECIPIENT kann Volladdressen ("Diese E-Mail-Adresse ist vor Spambots geschützt! Zur Anzeige muss JavaScript eingeschaltet sein.") oder
        # Domains ("@example.com" oder "example.com") enthalten. Wir matchen:
        #  - exakte Adresse
        #  - "@domain"-Form (Legacy)
        #  - Subdomain-Match auf den reinen Domainteil (konsistent mit Sender-Logik)
        if recipient in BLOCKLIST_RECIPIENT:
            log(f"blocked recipient: {recipient}")
            return "reject reject for policy reasons [policyguard-400] [non-existent-recipient]"
        if "@" in recipient:
            rcpt_domain_only = recipient.split("@", 1)[1]
            if ("@" + rcpt_domain_only) in BLOCKLIST_RECIPIENT or domain_matches_list(rcpt_domain_only, BLOCKLIST_RECIPIENT):
                log(f"blocked recipient by domain: {recipient}")
                return "reject reject for policy reasons [policyguard-400] [non-existent-recipient]"

    if "@" not in sender:
        return "dunno"

    domain = sender.split("@", 1)[1].lower()
    
    try:
        # Block by Clientname (O(1) Set Lookup)
        if domain_matches_list(client_name, BLOCKLIST_CLIENTNAME):
            log(f"blocked client hostname: {client_name}")
            return "reject reject for policy reasons [policyguard-400] [blacklist-client]"
            
        # Block Google Groups
        if helo_name == UNTRUSTED_HELO or helo_name.endswith("." + UNTRUSTED_HELO):
            matched, pattern = sender_matches_regex(sender)
            if matched:
                log(f"blocked sender by regex ({pattern}): {sender}")
                return "reject reject for policy reasons [policyguard-400] [gg]"
                
        # Block history bad domains (O(1) Set Lookup instead of loop)
        bl_cache = current_blocklist()
        if domain_matches_list(domain, bl_cache):
            log(f"blocked sender domain via autoblocklist {domain}")
            return "reject reject for policy reasons [policyguard-500] [abl]"
            
        # Block Firebase by Domain Ending (O(1) Set Lookup)
        if domain_matches_list(domain, BLOCKLIST_SENDER):
            log(f"blocked sender domain via blocklist {domain}")
            return "reject reject for policy reasons [policyguard-500] [blacklist-sender]"

        # Echte SPF-Auswertung via pyspf
        if client_address:
            spf_result, spf_explanation = spf.check2(i=client_address, s=sender, h=helo_name)
            if not domain_matches_list(client_name, WHITELIST_CLIENT_SPF):
               if spf_result == 'fail':
                   log(f"blocked sender {sender} due to SPF Fail ({spf_explanation})")
                   return "reject reject for policy reasons [policyguard-400] [spf-fail]"
            if (helo_name == UNTRUSTED_HELO or helo_name.endswith("." + UNTRUSTED_HELO)):
                if spf_result == 'permerror':
                    log(f"blocked sender {sender} due to SPF PermError ({spf_explanation})")
                    return "reject reject for policy reasons [policyguard-400] [spf-permerror]"
                if spf_result == 'softfail':
                    log(f"blocked Google HELO {helo_name} for sender {sender} due to SPF SoftFail ({spf_explanation})")
                    return f"reject reject for policy reasons [policyguard-400] [g-spf-sfail] SPF-Softfail: E-Mail Server {helo_name} ist kein authorisierter Sender für Domain {domain}. Bitte passen Sie ihren SPF Eintrag an."
                #if spf_result == 'none':
                #    log(f"blocked HELO {helo_name} without SPF for sender {domain}")
                #    return f"reject reject for policy reasons [policyguard-500] [g-spf-none] Kein SPF-Eintrag für Domain {domain} gesetzt. Zusendungen über Googles Spamschleuder-Mailserver benötigen zwingend einen SPF-Record."
                    
        # Block Firebase by SPF
        if spf_contains_block(domain):
            log(f"blocked sender domain via spf {domain}")
            return "reject reject for policy reasons [policyguard-500] [fb-txt-spf-badentry]"
            
        # Block by TXT prefix (Cached)
        if txt_contains_prefix(domain):
            log(f"blocked sender domain via TXT prefix {domain}")
            return "reject reject for policy reasons [policyguard-500] [fb-txt-prefix]"
            
        # Block hunter.io (Cached)
        if "check" in sender and check_for_hunter_io(domain):
            log(f"blocked sender domain via hunter.io {domain}")
            return "reject reject for policy reasons [policyguard-500] [hunter]"

        if client_name == "unknown":
            log(f"deferred unknown client: {client_address}")
            return "defer 450 4.7.1 Client host rejected: temporary defer for unknown hostname [policyguard-500]"
            
    except Exception as e:
        log(f"error checking {domain}: {e}")

    return "dunno"

def main():
    initialize_cache()
    while True:
        attrs = {}
        while True:
            line = sys.stdin.readline()
            if line == "":
                return
            line = line.strip()
            if not line:
                break
                
            if "=" in line:
                k, v = line.split("=", 1)
                attrs[k] = v
                
            # Interaktive Befehle direkt während des Einlesens verarbeiten (Original-Verhalten)
            if line.startswith("blocklist"):
                parts = line.split(None, 1)
                search_term = parts[1].strip().lower() if len(parts) > 1 else ""
                bl = sorted(current_blocklist())
                if search_term:
                    bl = [d for d in bl if search_term in d]
                print("==== BEGIN CURRENT BLOCKLIST ====")
                print("\n".join(bl))
                print("==== END CURRENT BLOCKLIST ====")
                sys.stdout.flush()
                
            if line == "spfcache":
                print("==== SPF CACHE ENTRY ====")
                print("\n".join(sorted(SPF_CACHE)))
                print("==== END SPF CACHE ENTRY ====")
                sys.stdout.flush()
                
            if line.startswith("stats"):
                parts = line.split(None, 1)
                search_term, min_pct, max_pct, only_blocked = parse_search_args(parts[1]) if len(parts) > 1 else ("", 0, 100, False)
                stats(search_term, min_pct, max_pct, only_blocked)
                
        if not attrs:
            continue

        action = handle_request(attrs)
        print(f"action={action}\n")
        sys.stdout.flush()

if __name__ == "__main__":
    main()