""" PhilVerify — Scoring Engine (Orchestrator) Ties together all NLP modules, Layer 1, and Layer 2 into a final VerificationResponse. Final Score = (ML Confidence × 0.40) + (Evidence Score × 0.60) """ import asyncio import json import logging import uuid from datetime import datetime, timezone from pathlib import Path from config import get_settings from api.schemas import ( VerificationResponse, Verdict, Language, DomainTier, Layer1Result, Layer2Result, EntitiesResult, EvidenceSource, Stance, ClassifierComparisonEntry, LDATopicResult, ) logger = logging.getLogger(__name__) settings = get_settings() # ── Module-level NLP singleton cache ───────────────────────────────────────── # These are created once per process and reused across all requests. # Creating fresh instances on every request causes unnecessary model reloads # from disk (300–500 ms each) which compounds into multi-second latency. _nlp_cache: dict = {} def _get_nlp(key: str, factory): """Return cached NLP instance, creating via factory() on first call.""" if key not in _nlp_cache: _nlp_cache[key] = factory() return _nlp_cache[key] # ── Classical classifier comparison ────────────────────────────────────────── # Runs all four classical ML classifiers on every request for the demo panel. # Each classifier trains once on first call and is cached via _get_nlp(). async def _run_comparison(text: str) -> tuple[list[ClassifierComparisonEntry], LDATopicResult | None]: """Run BoW, TF-IDF, Naive Bayes, and LDA classifiers. Also infer LDA topic.""" _COMPARISON_CLASSIFIERS = [ ("BoW", "cmp_bow", lambda: __import__("ml.bow_classifier", fromlist=["BoWClassifier"]).BoWClassifier()), ("TF-IDF", "cmp_tfidf", lambda: __import__("ml.tfidf_classifier", fromlist=["TFIDFClassifier"]).TFIDFClassifier()), ("Naive Bayes", "cmp_nb", lambda: __import__("ml.naive_bayes_classifier", fromlist=["NaiveBayesClassifier"]).NaiveBayesClassifier()), ("LDA", "cmp_lda", lambda: __import__("ml.lda_analysis", fromlist=["LDAFeatureClassifier"]).LDAFeatureClassifier()), ] def _predict_all(): results = [] lda_topic_result = None for name, key, factory in _COMPARISON_CLASSIFIERS: try: clf = _get_nlp(key, factory) r = clf.predict(text) results.append(ClassifierComparisonEntry( name=name, verdict=Verdict(r.verdict), confidence=r.confidence, top_features=r.triggered_features[:3], )) if name == "LDA" and hasattr(clf, "get_topic_info"): info = clf.get_topic_info(text) lda_topic_result = LDATopicResult(**info) except Exception as exc: logger.warning("Comparison classifier %s failed: %s", name, exc) return results, lda_topic_result loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _predict_all) # ── Domain credibility lookup ───────────────────────────────────────────────── _DOMAIN_DB_PATH = Path(__file__).parent.parent / "domain_credibility.json" _DOMAIN_DB: dict = {} def _load_domain_db() -> dict: global _DOMAIN_DB if not _DOMAIN_DB: try: _DOMAIN_DB = json.loads(_DOMAIN_DB_PATH.read_text()) except Exception as e: logger.warning("Could not load domain_credibility.json: %s", e) return _DOMAIN_DB def get_domain_tier(domain: str) -> DomainTier | None: if not domain: return None db = _load_domain_db() domain = domain.lower().replace("www.", "") for tier_key, tier_data in db.items(): if domain in tier_data.get("domains", []): return DomainTier(int(tier_key[-1])) return DomainTier.SUSPICIOUS # Unknown domains default to Tier 3 def _map_verdict(final_score: float) -> Verdict: if final_score >= settings.credible_threshold: return Verdict.CREDIBLE elif final_score >= settings.fake_threshold: return Verdict.UNVERIFIED else: return Verdict.LIKELY_FAKE async def run_verification( text: str, input_type: str = "text", source_domain: str | None = None, ) -> VerificationResponse: """ Full verification pipeline orchestrator. Runs NLP analysis and ML classifier synchronously, evidence retrieval async. """ # ── Lazy imports so app starts without heavy deps ───────────────────────── from nlp.preprocessor import TextPreprocessor from nlp.language_detector import LanguageDetector from nlp.ner import EntityExtractor from nlp.sentiment import SentimentAnalyzer from nlp.clickbait import ClickbaitDetector from nlp.claim_extractor import ClaimExtractor from evidence.news_fetcher import fetch_evidence, compute_similarity from evidence.stance_detector import detect_stance as _detect_stance # ── Step 1: Preprocess ──────────────────────────────────────────────────── preprocessor = _get_nlp("preprocessor", TextPreprocessor) proc = preprocessor.preprocess(text) # ── Step 2: Language detection ──────────────────────────────────────────── lang_detector = _get_nlp("lang_detector", LanguageDetector) lang_result = lang_detector.detect(text) language = Language(lang_result.language) if lang_result.language in Language._value2member_map_ else Language.TAGLISH # ── Steps 3–6: NLP analysis (run concurrently) ─────────────────────────── ner_extractor = _get_nlp("ner_extractor", EntityExtractor) sentiment_analyzer = _get_nlp("sentiment", SentimentAnalyzer) clickbait_detector = _get_nlp("clickbait", ClickbaitDetector) claim_extractor = _get_nlp("claim_extractor", ClaimExtractor) ner_result = ner_extractor.extract(text) sentiment_result = sentiment_analyzer.analyze(proc.cleaned) clickbait_result = clickbait_detector.detect(text) claim_result = claim_extractor.extract(proc.cleaned) # ── Step 7: Layer 1 — ML Classifier ────────────────────────────────────── # Priority: Ensemble (XLM-R + Tagalog-RoBERTa) → XLM-R alone → TF-IDF. # Tagalog-RoBERTa requires its own fine-tuned checkpoint; if missing the # engine silently falls back to XLM-R only without breaking anything. model_tier = "tfidf" classifier = None try: from ml.xlm_roberta_classifier import XLMRobertaClassifier, ModelNotFoundError from ml.tagalog_roberta_classifier import TagalogRobertaClassifier from ml.ensemble_classifier import EnsembleClassifier xlmr = _get_nlp("xlmr_classifier", XLMRobertaClassifier) members = [xlmr] model_tier = "xlmr" try: tl = _get_nlp("tagalog_classifier", TagalogRobertaClassifier) members.append(tl) model_tier = "ensemble" except ModelNotFoundError: logger.info("Tagalog-RoBERTa checkpoint not found — using XLM-R only") except Exception as exc: logger.warning("Tagalog-RoBERTa load failed (%s) — using XLM-R only", exc) classifier = EnsembleClassifier(members) except ModelNotFoundError: logger.info("XLM-RoBERTa checkpoint not found — falling back to TF-IDF baseline") except Exception as exc: logger.warning("XLM-RoBERTa load failed (%s) — falling back to TF-IDF", exc) if classifier is None: from ml.tfidf_classifier import TFIDFClassifier def _make_tfidf(): c = TFIDFClassifier(); c.train(); return c classifier = _get_nlp("tfidf_classifier", _make_tfidf) l1 = classifier.predict(proc.cleaned) logger.debug("Layer-1 (%s): %s %.1f%%", model_tier, l1.verdict, l1.confidence) # Enrich triggered features with NLP signals if clickbait_result.is_clickbait: l1.triggered_features.extend(clickbait_result.triggered_patterns[:3]) if sentiment_result.sentiment in ("high negative",): l1.triggered_features.append("high emotional language") layer1 = Layer1Result( verdict=Verdict(l1.verdict), confidence=l1.confidence, triggered_features=l1.triggered_features, model_tier=model_tier, ) # ── Step 8: Layer 2 — Evidence Retrieval ────────────────────────────────── # Default evidence score depends on source domain tier when no API key is set: # Tier 1 (Inquirer, GMA, Rappler…) → 65 – known credible, not neutral # Tier 2 (satire/opinion) → 45 – slight skepticism # Tier 3 (unknown) → 50 – neutral # Tier 4 (blacklisted) → 25 – heavy prior against _src_tier_pre = get_domain_tier(source_domain) if source_domain else None _EVIDENCE_DEFAULTS: dict = { DomainTier.CREDIBLE: 65.0, DomainTier.SATIRE_OPINION: 45.0, DomainTier.SUSPICIOUS: 50.0, DomainTier.KNOWN_FAKE: 25.0, } evidence_score = _EVIDENCE_DEFAULTS.get(_src_tier_pre, 50.0) if _src_tier_pre else 50.0 evidence_sources: list[EvidenceSource] = [] l2_verdict = Verdict.UNVERIFIED # Run classifier comparison concurrently with evidence fetch comparison_task = asyncio.create_task(_run_comparison(proc.cleaned)) if settings.news_api_key: try: query_entities = ner_result.persons + ner_result.organizations + ner_result.locations articles = await fetch_evidence( claim_result.claim, settings.news_api_key, entities=query_entities ) for art in articles[:5]: article_text = f"{art.get('title', '')} {art.get('description', '')}" sim = compute_similarity(claim_result.claim, article_text) domain = (art.get("source", {}) or {}).get("name", "unknown").lower() tier = get_domain_tier(domain) stance_result = _detect_stance( claim=claim_result.claim, article_title=art.get("title", ""), article_description=art.get("description", "") or "", article_url=art.get("url", ""), similarity=sim, ) stance = Stance(stance_result.stance.value) evidence_sources.append(EvidenceSource( title=art.get("title", ""), url=art.get("url", ""), similarity=sim, stance=stance, stance_reason=stance_result.reason, domain_tier=tier or DomainTier.SUSPICIOUS, published_at=art.get("publishedAt"), source_name=art.get("source", {}).get("name"), )) # Evidence score: average similarity × 100, penalized for refuting sources if evidence_sources: supporting = [s for s in evidence_sources if s.stance == Stance.SUPPORTS] refuting = [s for s in evidence_sources if s.stance == Stance.REFUTES] avg_sim = sum(s.similarity for s in evidence_sources) / len(evidence_sources) refute_penalty = len(refuting) * 15 evidence_score = max(0.0, min(100.0, avg_sim * 100 - refute_penalty)) if len(refuting) > len(supporting): l2_verdict = Verdict.LIKELY_FAKE elif len(supporting) >= 2: l2_verdict = Verdict.CREDIBLE except Exception as e: logger.warning("Evidence retrieval failed: %s — using neutral score", e) layer2 = Layer2Result( verdict=l2_verdict, evidence_score=round(evidence_score, 1), sources=evidence_sources, claim_used=claim_result.claim, claim_method=claim_result.method, ) # ── Step 9: Final Score ─────────────────────────────────────────────────── # ML confidence is 0-100 where high = more credible for the predicted class. # Adjust: if ML says Fake, its confidence works against credibility. ml_credibility = l1.confidence if l1.verdict == "Credible" else (100 - l1.confidence) base_score = (ml_credibility * settings.ml_weight) + (evidence_score * settings.evidence_weight) # Domain credibility adjustment — applied when we know the source URL. # The adjustment scales with how much ML disagrees with the domain tier: # - Tier 1 source but ML says Fake at high confidence → bigger boost needed # - Tier 4 source but ML says Credible at high confidence → bigger penalty # Base adjustments are scaled up by a "disagreement multiplier" (1.0–2.0) # so that a 95%-confident ML prediction on a Tier 1 source still respects # the fact that the article came from a verified outlet. domain_tier = get_domain_tier(source_domain) if source_domain else None domain_adjustment = 0.0 if domain_tier is not None: _BASE_ADJ = { DomainTier.CREDIBLE: +20.0, # Tier 1 — established PH news orgs DomainTier.SATIRE_OPINION: -5.0, # Tier 2 — satire / opinion blogs DomainTier.SUSPICIOUS: -10.0, # Tier 3 — unknown / unverified DomainTier.KNOWN_FAKE: -35.0, # Tier 4 — blacklisted } base_adj = _BASE_ADJ.get(domain_tier, 0.0) # Disagreement multiplier: how much does ML diverge from what the domain implies? # Tier 1 implies credible (75), Tier 4 implies fake (25); others neutral (50) _TIER_IMPLIED_SCORE = { DomainTier.CREDIBLE: 75.0, DomainTier.SATIRE_OPINION: 50.0, DomainTier.SUSPICIOUS: 50.0, DomainTier.KNOWN_FAKE: 25.0, } implied = _TIER_IMPLIED_SCORE.get(domain_tier, 50.0) disagreement = abs(ml_credibility - implied) / 50.0 # 0.0 – 1.0+, capped below multiplier = min(1.5, 1.0 + disagreement * 0.5) # 1.0 (agree) → 1.5 (hard disagree) domain_adjustment = base_adj * multiplier logger.info( "Domain credibility: %s (Tier %s) base=%+.0f × multiplier=%.2f → %+.1f pts " "(ml_credibility=%.1f, implied=%.0f)", source_domain, domain_tier.value, base_adj, multiplier, domain_adjustment, ml_credibility, implied, ) final_score = round(min(100.0, max(0.0, base_score + domain_adjustment)), 1) verdict = _map_verdict(final_score) # ── Step 10: Assemble response ──────────────────────────────────────────── comparison, lda_topic = await comparison_task result = VerificationResponse( verdict=verdict, confidence=round(max(l1.confidence, evidence_score / 100 * 100), 1), final_score=final_score, layer1=layer1, layer2=layer2, entities=EntitiesResult( persons=ner_result.persons, organizations=ner_result.organizations, locations=ner_result.locations, dates=ner_result.dates, ), sentiment=sentiment_result.sentiment, emotion=sentiment_result.emotion, language=language, domain_credibility=get_domain_tier(source_domain) if source_domain else None, input_type=input_type, classifier_comparison=comparison, lda_topic=lda_topic, ) # ── Record to Firestore (falls back to in-memory if Firebase not configured) ─ history_entry = { "id": str(uuid.uuid4()), "timestamp": datetime.now(timezone.utc).isoformat(), "input_type": input_type, "text_preview": text[:120], "verdict": verdict.value, "confidence": result.confidence, "final_score": final_score, "entities": ner_result.to_dict(), "claim_used": claim_result.claim, "layer1": { "verdict": layer1.verdict.value, "confidence": layer1.confidence, "triggered_features": layer1.triggered_features, }, "layer2": { "verdict": layer2.verdict.value, "evidence_score": layer2.evidence_score, "claim_used": layer2.claim_used, }, "sentiment": sentiment_result.sentiment, "emotion": sentiment_result.emotion, "language": language.value, } try: from api.routes.history import record_verification record_verification(history_entry) except Exception as e: logger.warning("Failed to record history: %s", e) return result