Ryan Christian D. Deniega
Add LDA topic inference β€” show detected topic label + defining words in UI
affe2db
"""
PhilVerify β€” Scoring Engine (Orchestrator)
Ties together all NLP modules, Layer 1, and Layer 2 into a final VerificationResponse.
Final Score = (ML Confidence Γ— 0.40) + (Evidence Score Γ— 0.60)
"""
import asyncio
import json
import logging
import uuid
from datetime import datetime, timezone
from pathlib import Path
from config import get_settings
from api.schemas import (
VerificationResponse, Verdict, Language, DomainTier,
Layer1Result, Layer2Result, EntitiesResult, EvidenceSource, Stance,
ClassifierComparisonEntry, LDATopicResult,
)
logger = logging.getLogger(__name__)
settings = get_settings()
# ── Module-level NLP singleton cache ─────────────────────────────────────────
# These are created once per process and reused across all requests.
# Creating fresh instances on every request causes unnecessary model reloads
# from disk (300–500 ms each) which compounds into multi-second latency.
_nlp_cache: dict = {}
def _get_nlp(key: str, factory):
"""Return cached NLP instance, creating via factory() on first call."""
if key not in _nlp_cache:
_nlp_cache[key] = factory()
return _nlp_cache[key]
# ── Classical classifier comparison ──────────────────────────────────────────
# Runs all four classical ML classifiers on every request for the demo panel.
# Each classifier trains once on first call and is cached via _get_nlp().
async def _run_comparison(text: str) -> tuple[list[ClassifierComparisonEntry], LDATopicResult | None]:
"""Run BoW, TF-IDF, Naive Bayes, and LDA classifiers. Also infer LDA topic."""
_COMPARISON_CLASSIFIERS = [
("BoW", "cmp_bow", lambda: __import__("ml.bow_classifier", fromlist=["BoWClassifier"]).BoWClassifier()),
("TF-IDF", "cmp_tfidf", lambda: __import__("ml.tfidf_classifier", fromlist=["TFIDFClassifier"]).TFIDFClassifier()),
("Naive Bayes", "cmp_nb", lambda: __import__("ml.naive_bayes_classifier", fromlist=["NaiveBayesClassifier"]).NaiveBayesClassifier()),
("LDA", "cmp_lda", lambda: __import__("ml.lda_analysis", fromlist=["LDAFeatureClassifier"]).LDAFeatureClassifier()),
]
def _predict_all():
results = []
lda_topic_result = None
for name, key, factory in _COMPARISON_CLASSIFIERS:
try:
clf = _get_nlp(key, factory)
r = clf.predict(text)
results.append(ClassifierComparisonEntry(
name=name,
verdict=Verdict(r.verdict),
confidence=r.confidence,
top_features=r.triggered_features[:3],
))
if name == "LDA" and hasattr(clf, "get_topic_info"):
info = clf.get_topic_info(text)
lda_topic_result = LDATopicResult(**info)
except Exception as exc:
logger.warning("Comparison classifier %s failed: %s", name, exc)
return results, lda_topic_result
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _predict_all)
# ── Domain credibility lookup ─────────────────────────────────────────────────
_DOMAIN_DB_PATH = Path(__file__).parent.parent / "domain_credibility.json"
_DOMAIN_DB: dict = {}
def _load_domain_db() -> dict:
global _DOMAIN_DB
if not _DOMAIN_DB:
try:
_DOMAIN_DB = json.loads(_DOMAIN_DB_PATH.read_text())
except Exception as e:
logger.warning("Could not load domain_credibility.json: %s", e)
return _DOMAIN_DB
def get_domain_tier(domain: str) -> DomainTier | None:
if not domain:
return None
db = _load_domain_db()
domain = domain.lower().replace("www.", "")
for tier_key, tier_data in db.items():
if domain in tier_data.get("domains", []):
return DomainTier(int(tier_key[-1]))
return DomainTier.SUSPICIOUS # Unknown domains default to Tier 3
def _map_verdict(final_score: float) -> Verdict:
if final_score >= settings.credible_threshold:
return Verdict.CREDIBLE
elif final_score >= settings.fake_threshold:
return Verdict.UNVERIFIED
else:
return Verdict.LIKELY_FAKE
async def run_verification(
text: str,
input_type: str = "text",
source_domain: str | None = None,
) -> VerificationResponse:
"""
Full verification pipeline orchestrator.
Runs NLP analysis and ML classifier synchronously, evidence retrieval async.
"""
# ── Lazy imports so app starts without heavy deps ─────────────────────────
from nlp.preprocessor import TextPreprocessor
from nlp.language_detector import LanguageDetector
from nlp.ner import EntityExtractor
from nlp.sentiment import SentimentAnalyzer
from nlp.clickbait import ClickbaitDetector
from nlp.claim_extractor import ClaimExtractor
from evidence.news_fetcher import fetch_evidence, compute_similarity
from evidence.stance_detector import detect_stance as _detect_stance
# ── Step 1: Preprocess ────────────────────────────────────────────────────
preprocessor = _get_nlp("preprocessor", TextPreprocessor)
proc = preprocessor.preprocess(text)
# ── Step 2: Language detection ────────────────────────────────────────────
lang_detector = _get_nlp("lang_detector", LanguageDetector)
lang_result = lang_detector.detect(text)
language = Language(lang_result.language) if lang_result.language in Language._value2member_map_ else Language.TAGLISH
# ── Steps 3–6: NLP analysis (run concurrently) ───────────────────────────
ner_extractor = _get_nlp("ner_extractor", EntityExtractor)
sentiment_analyzer = _get_nlp("sentiment", SentimentAnalyzer)
clickbait_detector = _get_nlp("clickbait", ClickbaitDetector)
claim_extractor = _get_nlp("claim_extractor", ClaimExtractor)
ner_result = ner_extractor.extract(text)
sentiment_result = sentiment_analyzer.analyze(proc.cleaned)
clickbait_result = clickbait_detector.detect(text)
claim_result = claim_extractor.extract(proc.cleaned)
# ── Step 7: Layer 1 β€” ML Classifier ──────────────────────────────────────
# Priority: Ensemble (XLM-R + Tagalog-RoBERTa) β†’ XLM-R alone β†’ TF-IDF.
# Tagalog-RoBERTa requires its own fine-tuned checkpoint; if missing the
# engine silently falls back to XLM-R only without breaking anything.
model_tier = "tfidf"
classifier = None
try:
from ml.xlm_roberta_classifier import XLMRobertaClassifier, ModelNotFoundError
from ml.tagalog_roberta_classifier import TagalogRobertaClassifier
from ml.ensemble_classifier import EnsembleClassifier
xlmr = _get_nlp("xlmr_classifier", XLMRobertaClassifier)
members = [xlmr]
model_tier = "xlmr"
try:
tl = _get_nlp("tagalog_classifier", TagalogRobertaClassifier)
members.append(tl)
model_tier = "ensemble"
except ModelNotFoundError:
logger.info("Tagalog-RoBERTa checkpoint not found β€” using XLM-R only")
except Exception as exc:
logger.warning("Tagalog-RoBERTa load failed (%s) β€” using XLM-R only", exc)
classifier = EnsembleClassifier(members)
except ModelNotFoundError:
logger.info("XLM-RoBERTa checkpoint not found β€” falling back to TF-IDF baseline")
except Exception as exc:
logger.warning("XLM-RoBERTa load failed (%s) β€” falling back to TF-IDF", exc)
if classifier is None:
from ml.tfidf_classifier import TFIDFClassifier
def _make_tfidf():
c = TFIDFClassifier(); c.train(); return c
classifier = _get_nlp("tfidf_classifier", _make_tfidf)
l1 = classifier.predict(proc.cleaned)
logger.debug("Layer-1 (%s): %s %.1f%%", model_tier, l1.verdict, l1.confidence)
# Enrich triggered features with NLP signals
if clickbait_result.is_clickbait:
l1.triggered_features.extend(clickbait_result.triggered_patterns[:3])
if sentiment_result.sentiment in ("high negative",):
l1.triggered_features.append("high emotional language")
layer1 = Layer1Result(
verdict=Verdict(l1.verdict),
confidence=l1.confidence,
triggered_features=l1.triggered_features,
model_tier=model_tier,
)
# ── Step 8: Layer 2 β€” Evidence Retrieval ──────────────────────────────────
# Default evidence score depends on source domain tier when no API key is set:
# Tier 1 (Inquirer, GMA, Rappler…) β†’ 65 – known credible, not neutral
# Tier 2 (satire/opinion) β†’ 45 – slight skepticism
# Tier 3 (unknown) β†’ 50 – neutral
# Tier 4 (blacklisted) β†’ 25 – heavy prior against
_src_tier_pre = get_domain_tier(source_domain) if source_domain else None
_EVIDENCE_DEFAULTS: dict = {
DomainTier.CREDIBLE: 65.0,
DomainTier.SATIRE_OPINION: 45.0,
DomainTier.SUSPICIOUS: 50.0,
DomainTier.KNOWN_FAKE: 25.0,
}
evidence_score = _EVIDENCE_DEFAULTS.get(_src_tier_pre, 50.0) if _src_tier_pre else 50.0
evidence_sources: list[EvidenceSource] = []
l2_verdict = Verdict.UNVERIFIED
# Run classifier comparison concurrently with evidence fetch
comparison_task = asyncio.create_task(_run_comparison(proc.cleaned))
if settings.news_api_key:
try:
query_entities = ner_result.persons + ner_result.organizations + ner_result.locations
articles = await fetch_evidence(
claim_result.claim,
settings.news_api_key,
entities=query_entities
)
for art in articles[:5]:
article_text = f"{art.get('title', '')} {art.get('description', '')}"
sim = compute_similarity(claim_result.claim, article_text)
domain = (art.get("source", {}) or {}).get("name", "unknown").lower()
tier = get_domain_tier(domain)
stance_result = _detect_stance(
claim=claim_result.claim,
article_title=art.get("title", ""),
article_description=art.get("description", "") or "",
article_url=art.get("url", ""),
similarity=sim,
)
stance = Stance(stance_result.stance.value)
evidence_sources.append(EvidenceSource(
title=art.get("title", ""),
url=art.get("url", ""),
similarity=sim,
stance=stance,
stance_reason=stance_result.reason,
domain_tier=tier or DomainTier.SUSPICIOUS,
published_at=art.get("publishedAt"),
source_name=art.get("source", {}).get("name"),
))
# Evidence score: average similarity Γ— 100, penalized for refuting sources
if evidence_sources:
supporting = [s for s in evidence_sources if s.stance == Stance.SUPPORTS]
refuting = [s for s in evidence_sources if s.stance == Stance.REFUTES]
avg_sim = sum(s.similarity for s in evidence_sources) / len(evidence_sources)
refute_penalty = len(refuting) * 15
evidence_score = max(0.0, min(100.0, avg_sim * 100 - refute_penalty))
if len(refuting) > len(supporting):
l2_verdict = Verdict.LIKELY_FAKE
elif len(supporting) >= 2:
l2_verdict = Verdict.CREDIBLE
except Exception as e:
logger.warning("Evidence retrieval failed: %s β€” using neutral score", e)
layer2 = Layer2Result(
verdict=l2_verdict,
evidence_score=round(evidence_score, 1),
sources=evidence_sources,
claim_used=claim_result.claim,
claim_method=claim_result.method,
)
# ── Step 9: Final Score ───────────────────────────────────────────────────
# ML confidence is 0-100 where high = more credible for the predicted class.
# Adjust: if ML says Fake, its confidence works against credibility.
ml_credibility = l1.confidence if l1.verdict == "Credible" else (100 - l1.confidence)
base_score = (ml_credibility * settings.ml_weight) + (evidence_score * settings.evidence_weight)
# Domain credibility adjustment β€” applied when we know the source URL.
# The adjustment scales with how much ML disagrees with the domain tier:
# - Tier 1 source but ML says Fake at high confidence β†’ bigger boost needed
# - Tier 4 source but ML says Credible at high confidence β†’ bigger penalty
# Base adjustments are scaled up by a "disagreement multiplier" (1.0–2.0)
# so that a 95%-confident ML prediction on a Tier 1 source still respects
# the fact that the article came from a verified outlet.
domain_tier = get_domain_tier(source_domain) if source_domain else None
domain_adjustment = 0.0
if domain_tier is not None:
_BASE_ADJ = {
DomainTier.CREDIBLE: +20.0, # Tier 1 β€” established PH news orgs
DomainTier.SATIRE_OPINION: -5.0, # Tier 2 β€” satire / opinion blogs
DomainTier.SUSPICIOUS: -10.0, # Tier 3 β€” unknown / unverified
DomainTier.KNOWN_FAKE: -35.0, # Tier 4 β€” blacklisted
}
base_adj = _BASE_ADJ.get(domain_tier, 0.0)
# Disagreement multiplier: how much does ML diverge from what the domain implies?
# Tier 1 implies credible (75), Tier 4 implies fake (25); others neutral (50)
_TIER_IMPLIED_SCORE = {
DomainTier.CREDIBLE: 75.0,
DomainTier.SATIRE_OPINION: 50.0,
DomainTier.SUSPICIOUS: 50.0,
DomainTier.KNOWN_FAKE: 25.0,
}
implied = _TIER_IMPLIED_SCORE.get(domain_tier, 50.0)
disagreement = abs(ml_credibility - implied) / 50.0 # 0.0 – 1.0+, capped below
multiplier = min(1.5, 1.0 + disagreement * 0.5) # 1.0 (agree) β†’ 1.5 (hard disagree)
domain_adjustment = base_adj * multiplier
logger.info(
"Domain credibility: %s (Tier %s) base=%+.0f Γ— multiplier=%.2f β†’ %+.1f pts "
"(ml_credibility=%.1f, implied=%.0f)",
source_domain, domain_tier.value, base_adj, multiplier, domain_adjustment,
ml_credibility, implied,
)
final_score = round(min(100.0, max(0.0, base_score + domain_adjustment)), 1)
verdict = _map_verdict(final_score)
# ── Step 10: Assemble response ────────────────────────────────────────────
comparison, lda_topic = await comparison_task
result = VerificationResponse(
verdict=verdict,
confidence=round(max(l1.confidence, evidence_score / 100 * 100), 1),
final_score=final_score,
layer1=layer1,
layer2=layer2,
entities=EntitiesResult(
persons=ner_result.persons,
organizations=ner_result.organizations,
locations=ner_result.locations,
dates=ner_result.dates,
),
sentiment=sentiment_result.sentiment,
emotion=sentiment_result.emotion,
language=language,
domain_credibility=get_domain_tier(source_domain) if source_domain else None,
input_type=input_type,
classifier_comparison=comparison,
lda_topic=lda_topic,
)
# ── Record to Firestore (falls back to in-memory if Firebase not configured) ─
history_entry = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"input_type": input_type,
"text_preview": text[:120],
"verdict": verdict.value,
"confidence": result.confidence,
"final_score": final_score,
"entities": ner_result.to_dict(),
"claim_used": claim_result.claim,
"layer1": {
"verdict": layer1.verdict.value,
"confidence": layer1.confidence,
"triggered_features": layer1.triggered_features,
},
"layer2": {
"verdict": layer2.verdict.value,
"evidence_score": layer2.evidence_score,
"claim_used": layer2.claim_used,
},
"sentiment": sentiment_result.sentiment,
"emotion": sentiment_result.emotion,
"language": language.value,
}
try:
from api.routes.history import record_verification
record_verification(history_entry)
except Exception as e:
logger.warning("Failed to record history: %s", e)
return result