PolicyGuard — Intelligenter Postfix-Spamfilter mit automatischer Lernfunktion
PolicyGuard ist ein in Python geschriebener Policy-Daemon für den Mail Transfer Agent Postfix. Er schaltet sich in den SMTP-Dialog ein und entscheidet in Echtzeit — noch bevor eine E-Mail vollständig angenommen wird — ob eine eingehende Nachricht zugestellt, abgelehnt oder temporär zurückgestellt wird.
Wie es mit Postfix zusammenarbeitet
Postfix leitet bei jeder eingehenden Verbindung Metadaten an den Daemon weiter: Absenderadresse, Empfänger, IP-Adresse des sendenden Servers, HELO-Name und Client-Hostname. PolicyGuard wertet diese Daten aus und antwortet mit einer Postfix-Direktive — entweder dunno (durchlassen), reject (sofort ablehnen) oder defer (temporär zurückstellen). Die eigentliche E-Mail wird dabei gar nicht erst übertragen, was den Server-Ressourcenverbrauch minimal hält.
Mehrstufige Filterkaskade
Jede eingehende E-Mail durchläuft mehrere Prüfebenen in fester Reihenfolge:
Empfänger-Blocklist — bestimmte Zieladressen werden pauschal abgewiesen
Client-Hostname-Blocklist — bekannte Spam-Infrastrukturen (z. B. google-usercontent) werden direkt geblockt
Regex-Filter — verdächtige Absender-Muster, typisch für Google-Groups-Missbrauch, werden erkannt
SPF-Prüfung — der sendende Server wird gegen den SPF-Eintrag der Absender-Domain geprüft; bei fail wird die Mail abgelehnt, bei Google-HELO zusätzlich softfail und permerror
Firebase-Erkennung — Domains, die Firebase-Mailversand nutzen, werden über SPF-Includes und TXT-Records erkannt und geblockt
Hunter.io-Erkennung — Domains, die im Abuse-Verzeichnis von Hunter.io gelistet sind, werden blockiert
Unbekannte Clients — Server ohne rDNS erhalten ein temporäres defer 450, was legitime Mailserver automatisch wiederholen lässt, Spam-Schleudern jedoch oft nicht
Das lernende Kernelement: die Auto-Blocklist
Das intelligenteste Feature ist die automatische Blocklist, die sich aus dem eigenen Mail-Journal aufbaut. Das Programm liest fortlaufend die Postfix-Logs via journalctl aus und zählt pro Absender-Domain, wie viele E-Mails blockiert versus akzeptiert wurden. Überschreitet eine Domain den konfigurierbaren Schwellenwert — standardmäßig 76% Blockierrate bei mindestens 4 Mails in den letzten 50 Tagen — wird sie automatisch zur Blocklist hinzugefügt. Das System lernt also aus dem eigenen Filterverhalten, ohne manuelles Eingreifen.
Effizienz durch Caching und inkrementelles Logging
Um bei jedem eingehenden Mail nicht neu rechnen zu müssen, hält PolicyGuard sämtliche Daten im RAM: die Auto-Blocklist, SPF-Auflösungen, DNS-TXT-Abfragen und Hunter.io-Ergebnisse werden gecacht. Die Journal-Daten vergangener Tage werden in einer lokalen JSON-Datei persistiert und nur einmalig eingelesen. Während des Betriebs liest das Programm lediglich inkrementell neue Journal-Einträge ein — höchstens alle 30 Sekunden — und aktualisiert die Blocklist bei Bedarf im Hintergrund. Über eine einfache Textschnittstelle lassen sich per stats- und blocklist-Kommando jederzeit Einblicke in das Filtergeschehen abrufen.
Installation unter Proxmox Mail-Gateway
Voraussetzungen
PolicyGuard läuft als Postfix Policy-Daemon unter Proxmox Mail Gateway (PMG). Für die Installation werden Root-Rechte benötigt.
1. Python-Abhängigkeiten installieren
apt install python3-spf python3-dnspython2. Skript installieren
Das Skript nach /usr/local/bin/ kopieren und ausführbar machen:
cp policyguard.py /usr/local/bin/policyguard.py
chmod +x /usr/local/bin/policyguard.py3. Arbeitsverzeichnis anlegen
PolicyGuard benötigt /t/ für den Journal-Cursor und den Cache:
mkdir -p /t/
chmod 777 /t/4. Journal-Zugriff für nobody erlauben
PolicyGuard läuft unter dem Benutzer nobody und benötigt Lesezugriff auf das systemd-Journal. Dazu eine sudo-Regel anlegen:
visudo -f /etc/sudoers.d/policyguardFolgenden Inhalt eintragen und speichern:
nobody ALL=(root) NOPASSWD: /usr/bin/journalctl5. PMG-Konfigurationstemplates vorbereiten
Damit Änderungen an der Postfix-Konfiguration PMG-Updates überstehen, müssen die Templates in das Override-Verzeichnis kopiert werden — sofern noch nicht geschehen:
mkdir -p /etc/pmg/templates/
cp /var/lib/pmg/templates/master.cf.in /etc/pmg/templates/
cp /var/lib/pmg/templates/main.cf.in /etc/pmg/templates/6. Postfix konfigurieren
main.cf.in — Sender-Restriktionen erweitern
In /etc/pmg/templates/main.cf.in den Policy-Dienst zu den smtpd_sender_restrictions hinzufügen:
smtpd_sender_restrictions =
check_policy_service unix:private/policygrdBestehende Restriktionen bleiben erhalten — die neue Zeile einfach in die vorhandene Liste einreihen.
master.cf.in — Daemon-Eintrag ergänzen
Am Ende von /etc/pmg/templates/master.cf.in folgende zwei Zeilen einfügen:
policygrd unix - n n - 0 spawn
user=nobody argv=/usr/local/bin/policyguard.py7. Konfiguration übernehmen
Nach dem Bearbeiten der Templates die PMG-Konfiguration neu generieren und Postfix neu laden:
pmgconfig sync --restart 1Überprüfung und Test
apt install socat
socat - UNIX-CONNECT:/var/spool/postfix/private/policygrd
blocklist domo
==== BEGIN CURRENT BLOCKLIST ====
domointelligence.com
==== END CURRENT BLOCKLIST ====
stats domo
==== TOTAL DOMAIN STATS (all days + today) ====
domointelligence.com accept: 4 blocked: 31 total: 35 percent blocked: 88 %
==== END TOTAL DOMAIN STATS ====
stats 60 80
==== TOTAL DOMAIN STATS (all days + today) ====
texbangla.com accept: 6 blocked: 24 total: 30 percent blocked: 80 %
dachser.com accept: 8 blocked: 29 total: 37 percent blocked: 78 %
rsgsv.net accept: 5 blocked: 18 total: 23 percent blocked: 78 %
premium-box.eu accept: 11 blocked: 34 total: 45 percent blocked: 75 %
hipalanet.com accept: 6 blocked: 18 total: 24 percent blocked: 75 %
dubaidutyfreetennischampionships.com accept: 2 blocked: 6 total: 8 percent blocked: 75 %
bobsbonsai.com accept: 6 blocked: 16 total: 22 percent blocked: 72 %
momentstudio.ca accept: 7 blocked: 18 total: 25 percent blocked: 72 %
table.media accept: 2 blocked: 5 total: 7 percent blocked: 71 %
carvertical.com accept: 2 blocked: 4 total: 6 percent blocked: 66 %
andia-international.com accept: 7 blocked: 14 total: 21 percent blocked: 66 %
mandrillapp.com accept: 2 blocked: 4 total: 6 percent blocked: 66 %
o2lenses.com accept: 3 blocked: 6 total: 9 percent blocked: 66 %
equinestar.net accept: 6 blocked: 11 total: 17 percent blocked: 64 %
==== END TOTAL DOMAIN STATS ====
Quellcode:
#!/usr/bin/env python3
import sys, re, dns.resolver, os, subprocess, json, requests, spf, time
from datetime import datetime, date, timedelta, timezone
from pathlib import Path
from collections import defaultdict
from typing import Iterator, Optional
# --- Whitelist / Exclude Domains ---
BLOCKLIST_BAN = {
"amazon.com", "gmail.com", "google.com", "googlemail.com", "mail.com",
"amazonses.com", "hotmail.com", "outlook.com", "ebay.com", "microsoft.com",
"mailjet.com", "apple.com", "protonmail.com", "crsend.com",
"onmicrosoft.com", "facebook.com",
}
BLOCKLIST_SPF = {"_spf.firebasemail.com"}
BLOCKLIST_TXT_PREFIX = {"firebase="}
BLOCKLIST_SENDER = {"firebaseapp.com", "maestro.bounces.google.com"}
WHITELIST_CLIENT_SPF = {""}
# Blockierte Empfänger (exakte Adressen oder @domain)
BLOCKLIST_RECIPIENT = { "" }
BLOCKLIST_CLIENTNAME = { "googleusercontent.com" }
BLOCKLIST_REGEX = [
r'^[a-z0-9]{1,3}\+bncB[A-Z0-9]{20,}@',
r'^[^@]+@s\.[a-zA-Z0-9.-]+$'
]
UNTRUSTED_HELO = "google.com"
WHITELIST_HELO = ""
# --- Globale Konstanten & Regex ---
MAX_SPF_DEPTH = 10
SPF_CACHE_SIZE = 4096
# Limit für sonstige DNS/HTTP Caches um Memory Leaks zu vermeiden
GENERIC_CACHE_SIZE = 4096
DAYS = int(os.getenv("DAYS", 50))
MIN_COUNT = int(os.getenv("MIN_COUNT", 4))
BLOCK_QUOTA = int(os.getenv("BLOCK_QUOTA", 76))
# Pre-computed Block Quota for faster division checks
BLOCK_QUOTA_FLOAT = BLOCK_QUOTA / 100.0
TRANSPORTMAP = os.getenv("TRANSPORTMAP", "/etc/pmg/transport")
JOURNAL_CURSOR_FILE = Path(os.getenv("JOURNAL_CURSOR_FILE", "/t/postfix.cursor"))
CACHE_FILE = Path(os.getenv("CACHE_FILE", "/t/postfix_cache.json"))
resolver = dns.resolver.Resolver()
resolver.timeout = 2
resolver.lifetime = 3
# --- Single Source of Truth für Journal-Filter-Tokens ---
# Diese Tokens werden an drei Stellen benötigt:
# 1. BLOCK_RE / ACCEPT_RE -> finale Regex-Klassifizierung der Message
# 2. _PREFILTER_TOKENS -> billiger Substring-Pre-Filter vor json.loads()
# 3. _JOURNAL_GREP -> server-seitiger Filter via journalctl --grep
#
# Pro Eintrag definieren wir:
# - prefilter_token: die Substring-Form, die im Logtext als Indikator reicht
# - regex_pattern: die genauere Form für BLOCK_RE/ACCEPT_RE (z.B. mit ":"
# oder Präfix "rejected: ")
# Die Trennung ist nötig, weil das Original den Pre-Filter laxer hält als die
# finale Regex (z.B. "Domain not found" als Pre-Filter, aber "rejected: Domain
# not found" als Regex). Diese Asymmetrie wird hier 1:1 erhalten.
_BLOCK_TOKENS: tuple[tuple[str, str], ...] = (
("proxy-reject", r"proxy-reject:"),
("policyguard-500", r"policyguard-500"),
("Domain not found", r"rejected: Domain not found"),
)
_ACCEPT_TOKENS: tuple[tuple[str, str], ...] = (
("proxy-accept", r"proxy-accept:"),
)
BLOCK_RE = re.compile("|".join(p for _, p in _BLOCK_TOKENS), re.IGNORECASE)
ACCEPT_RE = re.compile("|".join(p for _, p in _ACCEPT_TOKENS), re.IGNORECASE)
BLOCKLIST_REGEX_COMPILED = [re.compile(p) for p in BLOCKLIST_REGEX]
# Throttle für update_today_incremental (Sekunden zwischen journalctl-Aufrufen)
INCREMENTAL_UPDATE_INTERVAL = int(os.getenv("INCREMENTAL_UPDATE_INTERVAL", 30))
# Log-Rate-Limit: maximal so viele Log-Zeilen pro Sekunde, danach gedroppt
LOG_RATE_LIMIT_PER_SEC = int(os.getenv("LOG_RATE_LIMIT_PER_SEC", 200))
# Erst-Import: nach so vielen verarbeiteten Tagen wird der Cache zwischengespeichert
INITIAL_IMPORT_FLUSH_EVERY_DAYS = int(os.getenv("INITIAL_IMPORT_FLUSH_EVERY_DAYS", 5))
# --- State ---
CACHE: dict = {}
STATS: dict = {}
SPF_CACHE: dict = {}
SPF_BLOCK_CACHE: dict[str, bool] = {}
HUNTER_CACHE: dict[str, bool] = {}
TXT_PREFIX_CACHE: dict[str, bool] = {}
_blocklist_cache: set[str] = set()
_blocklist_dirty: bool = True
_last_incremental_update: float = 0.0
# Log-Rate-Limit-State
_log_window_start: float = 0.0
_log_window_count: int = 0
_log_dropped_count: int = 0
# --- Helfer für Pre-Computation ---
def base_domain(dom: str) -> str:
parts = dom.split(".")
if len(parts) < 2:
return dom
tld = parts[-1]
sld = parts[-2]
if (tld in {"uk", "au", "nz", "za", "br", "jp"} and
sld in {"co", "com", "net", "org", "gov", "ac", "edu"} and
len(parts) >= 3):
return ".".join(parts[-3:])
return ".".join(parts[-2:])
def normalize_domain(dom: str) -> str:
return base_domain(dom.lower().rstrip("."))
# Vorab normalisierte Sets für O(1) Lookups
BLOCKLIST_BAN_NORMALIZED = {normalize_domain(d) for d in BLOCKLIST_BAN}
def domain_matches_list(domain: str, target_set: set[str]) -> bool:
"""O(1) Check für exakte Matches oder Subdomains statt O(n) String-Endswith."""
if domain in target_set:
return True
parts = domain.split('.')
for i in range(1, len(parts)):
if '.'.join(parts[i:]) in target_set:
return True
return False
def ensure_cursor_dir():
JOURNAL_CURSOR_FILE.parent.mkdir(parents=True, exist_ok=True)
def load_transport_domains() -> set[str]:
domains = set()
transport_path = Path(TRANSPORTMAP)
if not transport_path.is_file():
return domains
with open(transport_path, "r", errors="ignore") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
key = line.split()[0].strip("[]").lower().rstrip(".")
domains.add(normalize_domain(key))
return domains
# ---------------------------------------------------------------------------
# Print Stats (Optimized)
# ---------------------------------------------------------------------------
def stats(search_term: str = "", min_pct: int = 0, max_pct: int = 100, only_blocked: bool = False):
transport_excludes = load_transport_domains()
all_excludes = BLOCKLIST_BAN_NORMALIZED | transport_excludes
# defaultdict ist signifikant schneller
agg = defaultdict(lambda: {"total": 0, "blocked": 0, "accepted": 0})
for day_data in [*CACHE.values(), STATS]:
for dom, s in day_data.items():
if dom in all_excludes or dom.endswith(".de"):
continue
e = agg[dom]
e["total"] += s["total"]
e["blocked"] += s["blocked"]
e["accepted"] += s["accepted"]
filtered = []
for dom, s in agg.items():
if s["total"] == 0 or s["blocked"] == 0:
continue
if search_term and search_term not in dom:
continue
pct = int(s["blocked"] / s["total"] * 100)
if min_pct <= pct <= max_pct:
filtered.append((dom, s, pct))
if not filtered:
print("No stats available.")
sys.stdout.flush()
return
# Sortieren greift direkt auf den vorberechneten pct-Wert (x[2]) zu
filtered.sort(key=lambda x: (x[2], x[1]["total"]), reverse=True)
print("==== TOTAL DOMAIN STATS (all days + today) ====")
for dom, s, pct in filtered:
meets_block_criteria = (s["total"] >= MIN_COUNT) and (pct >= BLOCK_QUOTA)
if only_blocked and not meets_block_criteria:
continue
marker = f">>{pct}%<<" if meets_block_criteria else f" {pct}% "
print(f"{dom:<40} accept: {s['accepted']:<5} blocked: {s['blocked']:<5} total: {s['total']:<5} {marker:<10}")
print("==== END TOTAL DOMAIN STATS ====")
sys.stdout.flush()
# ---------------------------------------------------------------------------
# Logging & Extraction
# ---------------------------------------------------------------------------
FROM_REGEX = re.compile(r"from=<[^>]*@([^> ,]+)>")
def extract_domain(message: str) -> str:
m = FROM_REGEX.search(message)
if m:
return normalize_domain(m.group(1))
return ""
def log(msg: str):
"""Schreibt nach stderr mit Rate-Limit, um Pipe-Blocking zu vermeiden."""
global _log_window_start, _log_window_count, _log_dropped_count
now = time.monotonic()
if now - _log_window_start >= 1.0:
# Neue Sekunde: ggf. Dropped-Counter ausgeben, dann Reset
if _log_dropped_count > 0:
try:
sys.stderr.write(f"spf-policy: [rate-limit] {_log_dropped_count} log lines dropped\n")
sys.stderr.flush()
except Exception:
pass
_log_dropped_count = 0
_log_window_start = now
_log_window_count = 0
if _log_window_count >= LOG_RATE_LIMIT_PER_SEC:
_log_dropped_count += 1
return
_log_window_count += 1
try:
sys.stderr.write(f"spf-policy: {msg}\n")
sys.stderr.flush()
except Exception:
pass
# ---------------------------------------------------------------------------
# Cache persistence
# ---------------------------------------------------------------------------
def save_cache():
CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
today_str = date.today().isoformat()
cutoff = (date.today() - timedelta(days=DAYS)).isoformat()
pruned = {
day: data for day, data in CACHE.items()
if day != today_str and day >= cutoff
}
tmp = CACHE_FILE.with_suffix(".tmp")
try:
with open(tmp, "w") as f:
json.dump(pruned, f)
tmp.replace(CACHE_FILE)
except Exception as e:
log(f"save_cache error: {e}")
def load_cache():
global CACHE
if not CACHE_FILE.exists():
return
try:
with open(CACHE_FILE) as f:
data = json.load(f)
today_str = date.today().isoformat()
cutoff = (date.today() - timedelta(days=DAYS)).isoformat()
CACHE = {
day: stats for day, stats in data.items()
if day != today_str and day >= cutoff
}
log(f"load_cache: {len(CACHE)} days loaded from {CACHE_FILE}")
except Exception as e:
log(f"load_cache error: {e}")
# ---------------------------------------------------------------------------
# Log reading
# Pre-Filter-Tokens und --grep-Pattern werden zentral aus _BLOCK_TOKENS /
# _ACCEPT_TOKENS oben abgeleitet -> keine drei Stellen mehr synchron zu halten.
_PREFILTER_TOKENS: tuple[str, ...] = tuple(
t for t, _ in (*_BLOCK_TOKENS, *_ACCEPT_TOKENS)
)
# --grep-Pattern (regex, ERE) - läuft serverseitig in journalctl.
# Reduziert die transferierte Datenmenge massiv bei stark verrauschten Logs.
_JOURNAL_GREP: str = "|".join(_PREFILTER_TOKENS)
def _journalctl_cmd(extra_args: list[str]) -> list[str]:
"""Baut den journalctl-Aufruf. Setzt voraus, dass der User Mitglied der
Gruppe systemd-journal ist (siehe Hinweis am Dateianfang). Falls sudo
zwingend benötigt wird, kann man hier ['sudo', '-n', ...] voranstellen."""
return [
"sudo", "journalctl", "--no-pager", "-u", "postfix", "-o", "json",
"--output-fields=MESSAGE,__REALTIME_TIMESTAMP",
"--grep", _JOURNAL_GREP,
] + extra_args
def _iter_journal_events(extra_args: list[str]) -> Iterator[tuple[str, str, bool, bool]]:
"""Generator über (line_date, domain, is_blocked, is_accepted) Tupel.
Streamt journalctl zeilenweise und hält keinen großen Buffer im RAM.
Schluckt Fehler und loggt einmalig.
"""
# Lokale Bindings für Performance im Hot-Path
block_search = BLOCK_RE.search
accept_search = ACCEPT_RE.search
json_loads = json.loads
fromtimestamp = datetime.fromtimestamp
_extract_domain = extract_domain
prefilter_tokens = _PREFILTER_TOKENS
proc = None
try:
proc = subprocess.Popen(
_journalctl_cmd(extra_args),
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
bufsize=1 << 16, # 64 KiB Pipe-Buffer
)
for raw in proc.stdout:
# Schneller Pre-Filter VOR json.loads (Substring ist ~50x schneller).
# journalctl --grep filtert serverseitig, aber falls --grep mal
# entfernt wird, bleibt diese Zeile als Sicherheitsnetz.
if not any(tok in raw for tok in prefilter_tokens):
continue
line = raw.strip()
if not line:
continue
try:
entry = json_loads(line)
except json.JSONDecodeError:
continue
message = entry.get("MESSAGE", "")
if not isinstance(message, str):
continue
is_blocked = bool(block_search(message))
is_accepted = bool(accept_search(message)) and not is_blocked
if not is_blocked and not is_accepted:
continue
dom = _extract_domain(message)
if not dom:
continue
ts = entry.get("__REALTIME_TIMESTAMP")
if not ts:
continue
try:
# UTC-bewusst: Tagesgrenzen sind sonst zeitzonenabhängig.
dt = fromtimestamp(int(ts) / 1_000_000, tz=timezone.utc)
line_date = dt.date().isoformat()
except (ValueError, TypeError, OSError):
continue
yield line_date, dom, is_blocked, is_accepted
proc.stdout.close()
proc.wait(timeout=120)
except subprocess.TimeoutExpired:
if proc is not None:
proc.kill()
log("_iter_journal_events: journalctl timed out")
except Exception as e:
log(f"_iter_journal_events error: {e}")
finally:
if proc is not None and proc.poll() is None:
try:
proc.kill()
except Exception:
pass
def _iter_journal_events_after_cursor() -> Iterator[tuple[str, str, bool, bool]]:
"""Wie _iter_journal_events, aber mit Cursor-File. Wird für inkrementelle
Updates während des laufenden Betriebs genutzt — typischerweise sehr wenig
Output, daher kein Streaming-Overhead nötig."""
return _iter_journal_events([f"--cursor-file={JOURNAL_CURSOR_FILE}"])
def _bump(day_dict: dict, dom: str, is_blocked: bool, is_accepted: bool):
"""Inkrementiert total/blocked/accepted für eine Domain in einem Tagesdict."""
e = day_dict.get(dom)
if e is None:
e = {"total": 0, "blocked": 0, "accepted": 0}
day_dict[dom] = e
e["total"] += 1
if is_blocked:
e["blocked"] += 1
if is_accepted:
e["accepted"] += 1
# ---------------------------------------------------------------------------
# Cache initialization
# ---------------------------------------------------------------------------
def initialize_cache():
"""RAM-schonender Erst-Import:
- Ein einziger journalctl-Stream für alle fehlenden Tage gemeinsam.
- Tagesweises Flushen in CACHE: sobald der Datums-Wechsel erkannt wird,
ist der vorherige Tag fertig und kann persistiert werden — der RAM
hält immer nur einen aktiven Tag.
- Periodisches save_cache() nach N fertigen Tagen, damit ein Crash
während Erst-Import nicht alle Fortschritte verliert.
"""
global CACHE, STATS, _blocklist_dirty, _last_incremental_update
load_cache()
today = date.today()
today_str = today.isoformat()
needed_days = {
(today - timedelta(days=i)).isoformat()
for i in range(1, DAYS + 1)
} - set(CACHE.keys())
if needed_days:
oldest = min(needed_days)
log(f"initialize_cache: need {len(needed_days)} days, streaming from {oldest}")
# Streaming-Strategie: aktuellen Tag im RAM halten, beim Wechsel flushen.
current_day: Optional[str] = None
current_day_data: dict = {}
days_done_since_flush = 0
# Tag-Set für O(1) Lookup beim Filtern (wir streamen ggf. auch
# Tage, die schon im CACHE sind — die werden hier verworfen).
needed_days_set = needed_days
# Wir nehmen einen großen Stream "since oldest" — journalctl wird die
# Tage in chronologischer Reihenfolge liefern, daher reicht ein
# einfacher Tageswechsel-Detektor.
for line_date, dom, is_blocked, is_accepted in _iter_journal_events(["--since", oldest]):
# Tage außerhalb des needed-Fensters überspringen
if line_date == today_str:
# Heutige Daten kommen weiter unten via STATS rein
continue
if line_date not in needed_days_set:
continue
if current_day is None:
current_day = line_date
current_day_data = {}
elif line_date != current_day:
# Tag fertig — flushen
CACHE[current_day] = current_day_data
days_done_since_flush += 1
current_day_data = {}
current_day = line_date
if days_done_since_flush >= INITIAL_IMPORT_FLUSH_EVERY_DAYS:
save_cache()
days_done_since_flush = 0
_bump(current_day_data, dom, is_blocked, is_accepted)
# Letzten Tag auch flushen
if current_day is not None:
CACHE[current_day] = current_day_data
current_day_data = {} # RAM freigeben
# Tage ohne Events trotzdem als leer markieren, sonst werden sie beim
# nächsten Start wieder importiert
for d in needed_days_set:
if d not in CACHE:
CACHE[d] = {}
save_cache()
else:
log("initialize_cache: all historical days present in cache file")
# Heutige Stats separat einlesen (kleiner Datensatz)
log("initialize_cache: loading today's stats from journal")
today_data: dict = {}
for line_date, dom, is_blocked, is_accepted in _iter_journal_events(["--since", today_str]):
if line_date != today_str:
continue
_bump(today_data, dom, is_blocked, is_accepted)
STATS = today_data
ensure_cursor_dir()
# Cursor-File initialisieren, indem wir einmal "leer" durchlaufen
# (oder: alle bereits gelesenen Events konsumieren)
for _ in _iter_journal_events_after_cursor():
pass
_blocklist_dirty = True
_last_incremental_update = time.monotonic()
def update_today_incremental():
"""Inkrementelles Update — wird bei jedem Request via current_blocklist()
aufgerufen, ist aber per Throttle auf INCREMENTAL_UPDATE_INTERVAL
Sekunden begrenzt, um nicht bei jeder Mail einen subprocess zu spawnen.
"""
global STATS, CACHE, _blocklist_dirty, _last_incremental_update
now = time.monotonic()
if now - _last_incremental_update < INCREMENTAL_UPDATE_INTERVAL:
return
_last_incremental_update = now
ensure_cursor_dir()
today_str = date.today().isoformat()
cache_updated = False
new_stats_seen = False
for line_date, dom, is_blocked, is_accepted in _iter_journal_events_after_cursor():
if line_date == today_str:
_bump(STATS, dom, is_blocked, is_accepted)
new_stats_seen = True
else:
# Vergangener Tag — nur eintragen, wenn noch nicht im CACHE
# (verhindert Doppelzählung bei Tagesübergang).
if line_date not in CACHE:
CACHE[line_date] = {}
cache_updated = True
_bump(CACHE[line_date], dom, is_blocked, is_accepted)
elif not CACHE[line_date]:
_bump(CACHE[line_date], dom, is_blocked, is_accepted)
cache_updated = True
if cache_updated:
save_cache()
if new_stats_seen or cache_updated:
_blocklist_dirty = True
# ---------------------------------------------------------------------------
# Candidate detection (Optimized)
# ---------------------------------------------------------------------------
def find_candidates(cache: dict, stats: dict) -> list[tuple[str, int]]:
total = defaultdict(int)
blocked = defaultdict(int)
for day_data in [*cache.values(), stats]:
for dom, s in day_data.items():
total[dom] += s["total"]
blocked[dom] += s["blocked"]
candidates = [
(dom, tot)
for dom, tot in total.items()
if tot >= MIN_COUNT and (blocked[dom] / tot) > BLOCK_QUOTA_FLOAT
]
return sorted(candidates, key=lambda x: x[1], reverse=True)
# ---------------------------------------------------------------------------
# Autoblocklist
# ---------------------------------------------------------------------------
def current_blocklist() -> set[str]:
global _blocklist_cache, _blocklist_dirty
update_today_incremental()
if not _blocklist_dirty:
return _blocklist_cache
transport_excludes = load_transport_domains()
all_excludes = BLOCKLIST_BAN_NORMALIZED | transport_excludes
candidates = find_candidates(CACHE, STATS)
# Als O(1) Set statt Liste speichern
_blocklist_cache = {
dom for dom, _ in candidates
if not dom.endswith(".de") and dom not in all_excludes
}
_blocklist_dirty = False
return _blocklist_cache
# ---------------------------------------------------------------------------
# Hunter IO (mit RAM Cache)
# ---------------------------------------------------------------------------
def check_for_hunter_io(url: str) -> bool:
global HUNTER_CACHE
if url in HUNTER_CACHE:
return HUNTER_CACHE[url]
headers = {"User-Agent": "Mozilla/5.0 (compatible; Python-Checker/1.0)"}
abusestring = "Diese E-Mail-Adresse ist vor Spambots geschützt! Zur Anzeige muss JavaScript eingeschaltet sein. "
result = False
try:
response = requests.get(f'https://{url}', headers=headers, timeout=3, allow_redirects=False)
response.raise_for_status()
result = abusestring in response.text
except Exception:
pass
if len(HUNTER_CACHE) < GENERIC_CACHE_SIZE:
HUNTER_CACHE[url] = result
return result
# ---------------------------------------------------------------------------
# SPF & TXT Check (mit RAM Cache)
# ---------------------------------------------------------------------------
def txt_contains_prefix(domain: str) -> bool:
global TXT_PREFIX_CACHE
if domain in TXT_PREFIX_CACHE:
return TXT_PREFIX_CACHE[domain]
result = False
try:
answers = resolver.resolve(domain, "TXT")
for r in answers:
txt = "".join([s.decode() for s in r.strings])
if any(txt.startswith(prefix) for prefix in BLOCKLIST_TXT_PREFIX):
result = True
break
except Exception:
pass
if len(TXT_PREFIX_CACHE) < GENERIC_CACHE_SIZE:
TXT_PREFIX_CACHE[domain] = result
return result
def get_spf(domain: str) -> str | None:
global SPF_CACHE
if domain in SPF_CACHE:
return SPF_CACHE[domain]
result = None
try:
answers = resolver.resolve(domain, "TXT")
for r in answers:
txt = "".join([s.decode() for s in r.strings])
if txt.startswith("v=spf1"):
result = txt
break
except Exception:
pass
if len(SPF_CACHE) < SPF_CACHE_SIZE:
SPF_CACHE[domain] = result
return result
def get_all_spf_includes(domain: str, depth: int = 0, visited: set | None = None) -> set:
if visited is None:
visited = set()
includes = set()
if depth > MAX_SPF_DEPTH or domain in visited:
return includes
visited.add(domain)
spf_record = get_spf(domain)
if not spf_record:
return includes
for part in spf_record.split():
if part.startswith("include:"):
include = part.split(":", 1)[1].lower()
includes.add(include)
includes.update(get_all_spf_includes(include, depth + 1, visited))
return includes
def spf_contains_block(domain: str) -> bool:
"""Cached: das Set-Building ist teuer, aber Domain-stabil."""
cached = SPF_BLOCK_CACHE.get(domain)
if cached is not None:
return cached
includes = get_all_spf_includes(domain)
blocklist_lower = {b.lower() for b in BLOCKLIST_SPF}
result = bool(includes & blocklist_lower)
if len(SPF_BLOCK_CACHE) < GENERIC_CACHE_SIZE:
SPF_BLOCK_CACHE[domain] = result
return result
def sender_matches_regex(sender: str) -> tuple[bool, str | None]:
for regex in BLOCKLIST_REGEX_COMPILED:
if regex.match(sender):
return True, regex.pattern
return False, None
# ---------------------------------------------------------------------------
# Request handling (Optimized O(1) lookups)
# ---------------------------------------------------------------------------
def parse_search_args(arg: str) -> tuple[str, int, int, bool]:
"""Parsed `stats`-Argumente.
Konvention:
- 0 Zahlen -> min=0, max=100
- 1 Zahl N -> min=N, max=100 (»mindestens N%«)
- 2+ Zahlen A B -> min=min(A,B), max=max(A,B)
- 'b' / 'blocked' -> only_blocked = True
"""
parts = arg.strip().split()
search_term = ""
numbers: list[int] = []
only_blocked = False
for p in parts:
p_lower = p.lower()
if p_lower in ("-b", "blocked"):
only_blocked = True
elif p.isdigit() and 0 <= int(p) <= 100:
numbers.append(int(p))
else:
search_term = p_lower
if len(numbers) == 0:
min_pct, max_pct = 0, 100
elif len(numbers) == 1:
min_pct, max_pct = numbers[0], 100
else:
min_pct, max_pct = min(numbers[0], numbers[1]), max(numbers[0], numbers[1])
return search_term, min_pct, max_pct, only_blocked
def handle_request(attrs: dict) -> str:
sender = attrs.get("sender", "")
recipient = attrs.get("recipient", "").lower() # NEU
client_name = attrs.get("client_name", "").lower()
helo_name = attrs.get("helo_name", "").lower()
client_address = attrs.get("client_address", "")
# HELO Whitelist
if helo_name == WHITELIST_HELO or helo_name.endswith("." + WHITELIST_HELO):
log(f"whitelist sender by whitelist")
return "dunno"
if recipient:
# BLOCKLIST_RECIPIENT kann Volladdressen ("Diese E-Mail-Adresse ist vor Spambots geschützt! Zur Anzeige muss JavaScript eingeschaltet sein. ") oder
# Domains ("@example.com" oder "example.com") enthalten. Wir matchen:
# - exakte Adresse
# - "@domain"-Form (Legacy)
# - Subdomain-Match auf den reinen Domainteil (konsistent mit Sender-Logik)
if recipient in BLOCKLIST_RECIPIENT:
log(f"blocked recipient: {recipient}")
return "reject reject for policy reasons [policyguard-400] [non-existent-recipient]"
if "@" in recipient:
rcpt_domain_only = recipient.split("@", 1)[1]
if ("@" + rcpt_domain_only) in BLOCKLIST_RECIPIENT or domain_matches_list(rcpt_domain_only, BLOCKLIST_RECIPIENT):
log(f"blocked recipient by domain: {recipient}")
return "reject reject for policy reasons [policyguard-400] [non-existent-recipient]"
if "@" not in sender:
return "dunno"
domain = sender.split("@", 1)[1].lower()
try:
# Block by Clientname (O(1) Set Lookup)
if domain_matches_list(client_name, BLOCKLIST_CLIENTNAME):
log(f"blocked client hostname: {client_name}")
return "reject reject for policy reasons [policyguard-400] [blacklist-client]"
# Block Google Groups
if helo_name == UNTRUSTED_HELO or helo_name.endswith("." + UNTRUSTED_HELO):
matched, pattern = sender_matches_regex(sender)
if matched:
log(f"blocked sender by regex ({pattern}): {sender}")
return "reject reject for policy reasons [policyguard-400] [gg]"
# Block history bad domains (O(1) Set Lookup instead of loop)
bl_cache = current_blocklist()
if domain_matches_list(domain, bl_cache):
log(f"blocked sender domain via autoblocklist {domain}")
return "reject reject for policy reasons [policyguard-500] [abl]"
# Block Firebase by Domain Ending (O(1) Set Lookup)
if domain_matches_list(domain, BLOCKLIST_SENDER):
log(f"blocked sender domain via blocklist {domain}")
return "reject reject for policy reasons [policyguard-500] [blacklist-sender]"
# Echte SPF-Auswertung via pyspf
if client_address:
spf_result, spf_explanation = spf.check2(i=client_address, s=sender, h=helo_name)
if not domain_matches_list(client_name, WHITELIST_CLIENT_SPF):
if spf_result == 'fail':
log(f"blocked sender {sender} due to SPF Fail ({spf_explanation})")
return "reject reject for policy reasons [policyguard-400] [spf-fail]"
if (helo_name == UNTRUSTED_HELO or helo_name.endswith("." + UNTRUSTED_HELO)):
if spf_result == 'permerror':
log(f"blocked sender {sender} due to SPF PermError ({spf_explanation})")
return "reject reject for policy reasons [policyguard-400] [spf-permerror]"
if spf_result == 'softfail':
log(f"blocked Google HELO {helo_name} for sender {sender} due to SPF SoftFail ({spf_explanation})")
return f"reject reject for policy reasons [policyguard-400] [g-spf-sfail] SPF-Softfail: E-Mail Server {helo_name} ist kein authorisierter Sender für Domain {domain}. Bitte passen Sie ihren SPF Eintrag an."
#if spf_result == 'none':
# log(f"blocked HELO {helo_name} without SPF for sender {domain}")
# return f"reject reject for policy reasons [policyguard-500] [g-spf-none] Kein SPF-Eintrag für Domain {domain} gesetzt. Zusendungen über Googles Spamschleuder-Mailserver benötigen zwingend einen SPF-Record."
# Block Firebase by SPF
if spf_contains_block(domain):
log(f"blocked sender domain via spf {domain}")
return "reject reject for policy reasons [policyguard-500] [fb-txt-spf-badentry]"
# Block by TXT prefix (Cached)
if txt_contains_prefix(domain):
log(f"blocked sender domain via TXT prefix {domain}")
return "reject reject for policy reasons [policyguard-500] [fb-txt-prefix]"
# Block hunter.io (Cached)
if "check" in sender and check_for_hunter_io(domain):
log(f"blocked sender domain via hunter.io {domain}")
return "reject reject for policy reasons [policyguard-500] [hunter]"
if client_name == "unknown":
log(f"deferred unknown client: {client_address}")
return "defer 450 4.7.1 Client host rejected: temporary defer for unknown hostname [policyguard-500]"
except Exception as e:
log(f"error checking {domain}: {e}")
return "dunno"
def main():
initialize_cache()
while True:
attrs = {}
while True:
line = sys.stdin.readline()
if line == "":
return
line = line.strip()
if not line:
break
if "=" in line:
k, v = line.split("=", 1)
attrs[k] = v
# Interaktive Befehle direkt während des Einlesens verarbeiten (Original-Verhalten)
if line.startswith("blocklist"):
parts = line.split(None, 1)
search_term = parts[1].strip().lower() if len(parts) > 1 else ""
bl = sorted(current_blocklist())
if search_term:
bl = [d for d in bl if search_term in d]
print("==== BEGIN CURRENT BLOCKLIST ====")
print("\n".join(bl))
print("==== END CURRENT BLOCKLIST ====")
sys.stdout.flush()
if line == "spfcache":
print("==== SPF CACHE ENTRY ====")
print("\n".join(sorted(SPF_CACHE)))
print("==== END SPF CACHE ENTRY ====")
sys.stdout.flush()
if line.startswith("stats"):
parts = line.split(None, 1)
search_term, min_pct, max_pct, only_blocked = parse_search_args(parts[1]) if len(parts) > 1 else ("", 0, 100, False)
stats(search_term, min_pct, max_pct, only_blocked)
if not attrs:
continue
action = handle_request(attrs)
print(f"action={action}\n")
sys.stdout.flush()
if __name__ == "__main__":
main()