# header_analyzer.py import re import difflib import whois from datetime import datetime from email.utils import parseaddr BRAND_OFFICIAL = { "paypal": ["paypal.com"], "amazon": ["amazon.com"], "google": ["google.com", "gmail.com"], "microsoft": ["microsoft.com", "outlook.com", "live.com"], "apple": ["apple.com"], } SUSPICIOUS_TLDS = {"xyz", "top", "click", "work", "loan", "tk", "zip", "mov"} FREE_EMAIL_PROVIDERS = { "gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "icloud.com" } def _extract_domain(addr: str) -> str: _, email_addr = parseaddr(addr or "") m = re.search(r"@([a-zA-Z0-9.-]+)", email_addr) return m.group(1).lower() if m else "" def _domain_age_days(domain: str): try: w = whois.whois(domain) cd = w.creation_date if isinstance(cd, list): cd = cd[0] if isinstance(cd, datetime): return (datetime.utcnow() - cd).days except Exception: return None return None def analyze_headers(headers: dict, body: str = ""): findings = [] score = 0 headers = headers or {} body_l = (body or "").lower() auth_header = ( headers.get("Authentication-Results") or headers.get("Authentication-results") or "" ).lower() auth_results = { "spf": "unknown", "dkim": "unknown", "dmarc": "unknown", } auth_summary = [] if "spf=fail" in auth_header: findings.append("Header: SPF authentication failed") auth_results["spf"] = "fail" auth_summary.append("SPF failed") score += 25 elif "spf=pass" in auth_header: auth_results["spf"] = "pass" if "dkim=fail" in auth_header or "dkim=permerror" in auth_header: findings.append("Header: DKIM authentication failed") auth_results["dkim"] = "fail" auth_summary.append("DKIM failed") score += 25 elif "dkim=pass" in auth_header: auth_results["dkim"] = "pass" if "dmarc=fail" in auth_header: findings.append("Header: DMARC authentication failed") auth_results["dmarc"] = "fail" auth_summary.append("DMARC failed") score += 30 elif "dmarc=pass" in auth_header: auth_results["dmarc"] = "pass" if not auth_summary: auth_summary.append("No strong authentication failures detected") from_domain = _extract_domain(headers.get("From", "")) reply_domain = _extract_domain(headers.get("Reply-To", "")) if reply_domain and from_domain and reply_domain != from_domain: findings.append( f"Header: Reply-To domain mismatch (From={from_domain}, Reply-To={reply_domain})" ) score += 35 if from_domain in FREE_EMAIL_PROVIDERS: findings.append(f"Header: Free email provider used ({from_domain})") score += 15 if any(k.lower() in headers for k in ["bcc", "cc"]) and not headers.get("To"): findings.append("Header: Possible BEC — CC/BCC without To header") score += 20 if any(x in body_l for x in ["wire transfer", "urgent payment", "bank details"]): findings.append("Header/Body: Financial request pattern (BEC)") score += 35 if from_domain: tld = from_domain.split(".")[-1] if tld in SUSPICIOUS_TLDS: findings.append(f"Header: Suspicious TLD used ({tld})") score += 20 age = _domain_age_days(from_domain) if age is not None and age < 90: findings.append(f"Header: Sender domain very new ({age} days)") score += 30 for brand, legit_domains in BRAND_OFFICIAL.items(): if brand in from_domain: if not any(from_domain.endswith(ld) for ld in legit_domains): findings.append( f"Header: Brand impersonation detected ({brand} in {from_domain})" ) score += 40 for legit in legit_domains: ratio = difflib.SequenceMatcher(None, from_domain, legit).ratio() if ratio > 0.75 and from_domain != legit: findings.append( f"Header: Look-alike domain detected ({from_domain} vs {legit})" ) score += 40 score = min(score, 100) return findings, score, { "summary": ", ".join(auth_summary), "results": auth_results, }