""" app.py — NEPA AI Assistant Gradio Space with ZeroGPU running Qwen3-8B for RAG-powered Q&A across 60,000+ federal NEPA environmental review projects. Architecture: - Metadata loaded at startup on CPU (~200MB RAM) - Hybrid retrieval (semantic + keyword) runs on CPU - LLM inference uses @spaces.GPU (H200, on-demand) - Streaming responses with TextIteratorStreamer """ import spaces import os import re import numpy as np import torch import logging import gradio as gr import pandas as pd from threading import Thread from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer from dataset_loader import ( empty_metadata_df, get_dataset_stats, load_nepatec_metadata, ) try: from sentence_transformers import CrossEncoder, SentenceTransformer except Exception: CrossEncoder = None SentenceTransformer = None # ── Logging ─────────────────────────────────────────────────── logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ── Configuration ───────────────────────────────────────────── MODEL_ID = "Qwen/Qwen3-8B" MAX_NEW_TOKENS = 1024 MAX_CONTEXT_PROJECTS = 10 # How many projects to inject as RAG context GPU_DURATION = 120 # Max seconds per inference call EMBEDDING_MODEL_ID = os.getenv("EMBEDDING_MODEL_ID", "sentence-transformers/all-MiniLM-L6-v2") EMBEDDING_CACHE_PATH = os.getenv("EMBEDDING_CACHE_PATH", "nepatec_semantic_index.npz") ENABLE_SEMANTIC_SEARCH = os.getenv("ENABLE_SEMANTIC_SEARCH", "1") != "0" SEMANTIC_WEIGHT = float(os.getenv("SEMANTIC_WEIGHT", "0.75")) SEMANTIC_CANDIDATE_MULTIPLIER = int(os.getenv("SEMANTIC_CANDIDATE_MULTIPLIER", "8")) ENABLE_RERANKER = os.getenv("ENABLE_RERANKER", "0") == "1" RERANKER_MODEL_ID = os.getenv("RERANKER_MODEL_ID", "cross-encoder/ms-marco-MiniLM-L-6-v2") RERANK_TOP_K = int(os.getenv("RERANK_TOP_K", "40")) # ── Load Model Globally (CPU until @spaces.GPU activates) ───── tokenizer = None model = None model_load_error = None try: logger.info(f"Loading tokenizer: {MODEL_ID}") tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) logger.info(f"Loading model: {MODEL_ID} (bfloat16)") model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto", ) model.eval() logger.info("Model loaded successfully") except Exception as e: model_load_error = str(e) logger.exception(f"Model initialization failed: {e}") # ── Load NEPATEC Metadata (CPU, cached to Parquet) ──────────── logger.info("Loading NEPATEC 2.0 metadata...") try: metadata_df = load_nepatec_metadata() if metadata_df.empty: logger.warning("Metadata loaded empty; search features will be limited") except Exception as e: logger.exception(f"Metadata initialization failed: {e}") metadata_df = empty_metadata_df() stats = get_dataset_stats(metadata_df) if not metadata_df.empty else {} logger.info(f"Metadata ready: {len(metadata_df):,} projects") # ── Dropdown values for filters ─────────────────────────────── ALL_AGENCIES = sorted( metadata_df.get("lead_agency", pd.Series(dtype=str)) .replace("", pd.NA) .dropna() .astype(str) .unique() .tolist() ) ALL_SECTORS = sorted( metadata_df.get("project_sector", pd.Series(dtype=str)) .replace("", pd.NA) .dropna() .astype(str) .unique() .tolist() ) NEPA_CATEGORIES = ["CE", "EA", "EIS"] # Common conversational words that should not act as hard filters. QUERY_STOPWORDS = { "a", "an", "and", "are", "as", "at", "be", "by", "for", "from", "in", "into", "is", "it", "latest", "list", "me", "most", "new", "newest", "of", "on", "or", "please", "project", "projects", "recent", "show", "the", "to", "what", "which", "with", "about", "any", "find", "give", "tell", } hybrid_retriever = None retrieval_backend_status = "Keyword-only retrieval" # ══════════════════════════════════════════════════════════════ # SEARCH (CPU only — no GPU cost) # ══════════════════════════════════════════════════════════════ def build_context(results_df): """Format matched projects as context string for the LLM.""" if results_df.empty: return "No matching projects found in the NEPATEC database." chunks = [] for i, (_, r) in enumerate(results_df.iterrows(), 1): title = str(r.get("project_title", "") or "Untitled project") agency = str(r.get("lead_agency", "") or "Unknown agency") location = str(r.get("location", "") or "Unknown location") category = str(r.get("nepa_category", "") or "Unknown") sector = str(r.get("project_sector", "") or "Unknown") num_documents = r.get("num_documents", 0) doc_types = str(r.get("document_types", "") or "Unknown") chunk = ( f"{i}. {title}\n" f" Agency: {agency}\n" f" Location: {location}\n" f" Category: {category} | Sector: {sector}\n" f" Documents: {num_documents} files | Types: {doc_types}" ) desc = r.get("project_description", "") if desc and len(str(desc)) > 10: # Truncate long descriptions to save context tokens chunk += f"\n Description: {str(desc)[:300]}..." chunks.append(chunk) return "\n\n".join(chunks) def _extract_query_terms(query: str): """Normalize user query to useful literal keywords.""" raw_terms = re.findall(r"[a-z0-9]+", str(query).lower()) if not raw_terms: return [] cleaned = [ t for t in raw_terms if t not in QUERY_STOPWORDS and len(t) >= 2 ] terms = cleaned if cleaned else raw_terms # Preserve order while de-duplicating. return list(dict.fromkeys(terms)) def _search_series(df: pd.DataFrame): return df.get("search_text", pd.Series("", index=df.index)).fillna("").astype(str) def _keyword_scores(df: pd.DataFrame, terms): """Return normalized lexical match scores in [0, 1].""" if df.empty: return pd.Series(dtype="float32") if not terms: return pd.Series(0.0, index=df.index, dtype="float32") search_series = _search_series(df) score = pd.Series(0.0, index=df.index, dtype="float32") for term in terms: score += search_series.str.contains(term, na=False, regex=False).astype("float32") denom = float(max(len(terms), 1)) return (score / denom).clip(0, 1) def _keyword_filter(df: pd.DataFrame, terms): """ Search strategy: 1) strict all-term match 2) fallback relaxed ranked match (any term, sorted by #matches) """ if df.empty: return df if not terms: return df search_series = _search_series(df) strict_mask = pd.Series(True, index=df.index) for term in terms: strict_mask &= search_series.str.contains(term, na=False, regex=False) strict = df[strict_mask] if not strict.empty: return strict relaxed = df.assign(_match_score=_keyword_scores(df, terms)) relaxed = relaxed[relaxed["_match_score"] > 0] if relaxed.empty: return relaxed.drop(columns=["_match_score"], errors="ignore") sort_cols = ["_match_score"] sort_asc = [False] if "num_documents" in relaxed.columns: sort_cols.append("num_documents") sort_asc.append(False) relaxed = relaxed.sort_values(sort_cols, ascending=sort_asc) return relaxed.drop(columns=["_match_score"], errors="ignore") def _row_to_semantic_text(row: pd.Series): """Compact metadata text used by semantic retrieval / reranking.""" return ( f"Title: {row.get('project_title', '')}. " f"Agency: {row.get('lead_agency', '')}. " f"Location: {row.get('location', '')}. " f"Sector: {row.get('project_sector', '')}. " f"Category: {row.get('nepa_category', '')}. " f"Type: {row.get('project_type', '')}. " f"Description: {row.get('project_description', '')}. " f"Documents: {row.get('document_types', '')}." ) class HybridRetriever: """ Hybrid retriever: - Dense semantic embeddings for intent/synonym matching - Lexical match score for exact-term precision - Optional reranker over top candidates """ def __init__(self, df: pd.DataFrame): self.enabled = False self.status = "Keyword-only retrieval" self.embedding_model = None self.reranker = None self.embeddings = None self.row_ids = None self.row_to_pos = {} if df is None or df.empty: self.status = "Keyword-only retrieval (no metadata)" return if not ENABLE_SEMANTIC_SEARCH: self.status = "Keyword-only retrieval (semantic disabled)" return if SentenceTransformer is None: self.status = "Keyword-only retrieval (sentence-transformers unavailable)" return try: self.embedding_model = SentenceTransformer(EMBEDDING_MODEL_ID) self._load_or_build_index(df) self._maybe_load_reranker() self.enabled = True self.status = "Hybrid semantic retrieval enabled" if self.reranker is not None: self.status += " (+ reranker)" except Exception as e: logger.exception(f"Semantic retriever initialization failed: {e}") self.status = f"Keyword-only retrieval (semantic init failed: {e})" def _cache_signature(self, df: pd.DataFrame): project_ids = df.get("project_id", pd.Series(dtype=str)).fillna("").astype(str) head = "|".join(project_ids.head(5).tolist()) tail = "|".join(project_ids.tail(5).tolist()) return f"{len(df)}::{head}::{tail}::{EMBEDDING_MODEL_ID}" def _load_cached_index(self, signature: str): if not os.path.exists(EMBEDDING_CACHE_PATH): return None, None try: blob = np.load(EMBEDDING_CACHE_PATH, allow_pickle=False) cached_signature = str(blob["signature"][0]) if cached_signature != signature: logger.info("Semantic index cache signature mismatch; rebuilding.") return None, None embeddings = blob["embeddings"].astype("float32") row_ids = blob["row_ids"] if embeddings.ndim != 2 or len(embeddings) != len(row_ids): logger.warning("Semantic cache shape mismatch; rebuilding.") return None, None return embeddings, row_ids except Exception as e: logger.warning(f"Failed to load semantic cache: {e}") return None, None def _save_cached_index(self, signature: str, embeddings: np.ndarray, row_ids: np.ndarray): try: np.savez_compressed( EMBEDDING_CACHE_PATH, signature=np.array([signature], dtype="U"), embeddings=embeddings.astype("float16"), row_ids=row_ids, ) logger.info(f"Saved semantic index cache to {EMBEDDING_CACHE_PATH}") except Exception as e: logger.warning(f"Failed to save semantic cache: {e}") def _load_or_build_index(self, df: pd.DataFrame): signature = self._cache_signature(df) cached_embeddings, cached_row_ids = self._load_cached_index(signature) if cached_embeddings is not None and cached_row_ids is not None: self.embeddings = cached_embeddings self.row_ids = cached_row_ids self.row_to_pos = {rid: i for i, rid in enumerate(self.row_ids.tolist())} logger.info(f"Loaded semantic index from cache ({len(self.row_ids):,} vectors)") return logger.info("Building semantic vector index from metadata...") corpus = [_row_to_semantic_text(row) for _, row in df.iterrows()] vectors = self.embedding_model.encode( corpus, batch_size=128, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True, ).astype("float32") self.embeddings = vectors self.row_ids = df.index.to_numpy() self.row_to_pos = {rid: i for i, rid in enumerate(self.row_ids.tolist())} self._save_cached_index(signature, vectors, self.row_ids) logger.info(f"Semantic vector index ready ({len(self.row_ids):,} vectors)") def _maybe_load_reranker(self): if not ENABLE_RERANKER: return if CrossEncoder is None: logger.warning("Reranker requested but sentence-transformers CrossEncoder unavailable.") return try: self.reranker = CrossEncoder(RERANKER_MODEL_ID) logger.info(f"Reranker loaded: {RERANKER_MODEL_ID}") except Exception as e: logger.warning(f"Reranker failed to load ({RERANKER_MODEL_ID}): {e}") self.reranker = None def _dense_scores_for_scope(self, query: str, df_scope: pd.DataFrame): query_vec = self.embedding_model.encode( [query], convert_to_numpy=True, normalize_embeddings=True, )[0].astype("float32") scope_row_ids = df_scope.index.tolist() scope_pos = [self.row_to_pos.get(rid, -1) for rid in scope_row_ids] dense_scores = np.zeros(len(scope_pos), dtype="float32") valid_idx = [i for i, pos in enumerate(scope_pos) if pos >= 0] if not valid_idx: return dense_scores valid_pos = np.array([scope_pos[i] for i in valid_idx], dtype=np.int64) dense_values = self.embeddings[valid_pos] @ query_vec dense_scores[np.array(valid_idx, dtype=np.int64)] = dense_values return dense_scores def search(self, query: str, df_scope: pd.DataFrame, terms, top_k: int, use_reranker: bool = False): if df_scope.empty: return df_scope.head(0) if not query or not query.strip(): return df_scope.head(top_k) if not self.enabled: return _keyword_filter(df_scope, terms).head(top_k) dense_scores = self._dense_scores_for_scope(query, df_scope) dense_norm = np.clip((dense_scores + 1.0) / 2.0, 0.0, 1.0) lexical_norm = _keyword_scores(df_scope, terms).to_numpy(dtype="float32") fused = (SEMANTIC_WEIGHT * dense_norm) + ((1.0 - SEMANTIC_WEIGHT) * lexical_norm) if len(fused) == 0: return df_scope.head(0) candidate_n = min( len(df_scope), max(top_k * SEMANTIC_CANDIDATE_MULTIPLIER, top_k), ) if candidate_n >= len(fused): top_idx = np.argsort(fused)[::-1] else: top_idx = np.argpartition(fused, -candidate_n)[-candidate_n:] top_idx = top_idx[np.argsort(fused[top_idx])[::-1]] ranked = df_scope.iloc[top_idx].copy() ranked["_fused_score"] = fused[top_idx] ranked = ranked.sort_values("_fused_score", ascending=False) if use_reranker and self.reranker is not None and len(ranked) > 1: rerank_n = min(len(ranked), RERANK_TOP_K) rerank_head = ranked.head(rerank_n).copy() pairs = [[query, _row_to_semantic_text(row)] for _, row in rerank_head.iterrows()] try: rerank_head["_rerank_score"] = np.array( self.reranker.predict(pairs), dtype="float32", ) rerank_head = rerank_head.sort_values("_rerank_score", ascending=False) ranked = pd.concat([rerank_head, ranked.iloc[rerank_n:]], axis=0) except Exception as e: logger.warning(f"Reranker scoring failed; using fused ranking only: {e}") return ranked.head(top_k).drop(columns=["_fused_score", "_rerank_score"], errors="ignore") def search_projects(query, df, top_k=MAX_CONTEXT_PROJECTS): """Hybrid semantic + keyword retrieval on CPU.""" if df.empty: return df.head(0) if not query or not query.strip(): return df.head(top_k) terms = _extract_query_terms(query) if hybrid_retriever is not None: return hybrid_retriever.search( query=query, df_scope=df, terms=terms, top_k=top_k, use_reranker=True, ) return _keyword_filter(df, terms).head(top_k) if not metadata_df.empty: hybrid_retriever = HybridRetriever(metadata_df) retrieval_backend_status = hybrid_retriever.status logger.info(f"Retrieval backend: {retrieval_backend_status}") # ══════════════════════════════════════════════════════════════ # LLM INFERENCE (GPU — @spaces.GPU decorated) # ══════════════════════════════════════════════════════════════ SYSTEM_PROMPT = """You are a knowledgeable assistant specializing in NEPA (National Environmental Policy Act) environmental review documents. You have access to metadata from over 60,000 federal projects in the NEPATEC 2.0 dataset. When answering questions: - Reference specific project names, agencies, and locations from the retrieved data - Distinguish between CE (Categorical Exclusions), EA (Environmental Assessments), and EIS (Environmental Impact Statements) - If the retrieved projects don't fully answer the question, say so clearly - Be concise but informative - If asked for "latest/newest", explain that metadata may not include reliable chronology and provide best available matches - When listing projects, include the agency and location for each""" def _metadata_fallback_reply(message: str, top_k: int = 5): """ Fallback when ZeroGPU cannot allocate CUDA. Returns a metadata-grounded answer without LLM generation. """ results = search_projects(message, metadata_df, top_k=top_k) if results.empty: return ( "GPU is temporarily unavailable, and I couldn't find clear metadata matches. " "Please retry in a minute." ) lines = [] for i, (_, row) in enumerate(results.iterrows(), 1): title = str(row.get("project_title", "") or "Untitled project") agency = str(row.get("lead_agency", "") or "Unknown agency") location = str(row.get("location", "") or "Unknown location") category = str(row.get("nepa_category", "") or "Unknown") lines.append(f"{i}. {title} ({agency}; {location}; {category})") return ( "GPU is temporarily unavailable, so I returned the best metadata matches:\n\n" + "\n".join(lines) + "\n\nRetry in a minute for full AI-generated analysis." ) @spaces.GPU(duration=GPU_DURATION) def _chat_respond_gpu(message, history): """ RAG-augmented chat: search metadata → build context → generate with Qwen3-8B. Streams tokens back to the UI. """ if model is None or tokenizer is None: msg = "Model is unavailable right now. Please retry in a minute." if model_load_error: return f"{msg}\n\nStartup detail: {model_load_error}" return msg # 1. Search metadata on CPU (already on CPU, no GPU cost for this part) results = search_projects(message, metadata_df) context = build_context(results) match_count = len(results) # 2. Build prompt with retrieved context system_msg = ( f"{SYSTEM_PROMPT}\n\n" f"--- Retrieved NEPA Projects ({match_count} matches) ---\n" f"{context}\n" f"--- End of Retrieved Data ---" ) messages = [{"role": "system", "content": system_msg}] for msg in history or []: # Support both Gradio history formats: # - dict style: {"role": "user"/"assistant", "content": "..."} # - tuple/list style: (user_message, assistant_message) if isinstance(msg, dict): role = msg.get("role") content = msg.get("content") if role in {"user", "assistant"} and content is not None: messages.append({"role": role, "content": str(content)}) elif isinstance(msg, (list, tuple)) and len(msg) == 2: user_msg, assistant_msg = msg if user_msg is not None: messages.append({"role": "user", "content": str(user_msg)}) if assistant_msg is not None: messages.append({"role": "assistant", "content": str(assistant_msg)}) messages.append({"role": "user", "content": message}) # 3. Tokenize try: text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, enable_thinking=False, # Skip reasoning chain for fast RAG answers ) except TypeError: # Backward compatibility with tokenizer versions that don't support enable_thinking. text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, ) inputs = tokenizer([text], return_tensors="pt") if hasattr(model, "device"): inputs = inputs.to(model.device) # 4. Stream generation streamer = TextIteratorStreamer( tokenizer, skip_prompt=True, skip_special_tokens=True ) generation_kwargs = { **inputs, "streamer": streamer, "max_new_tokens": MAX_NEW_TOKENS, "temperature": 0.7, "top_p": 0.8, "top_k": 20, "do_sample": True, "repetition_penalty": 1.05, } thread = Thread(target=model.generate, kwargs=generation_kwargs) thread.start() partial = "" for token in streamer: partial += token yield partial def chat_respond(message, history): """ Wrapper around GPU chat to avoid hard failures when ZeroGPU allocation fails. """ try: yield from _chat_respond_gpu(message, history) except Exception as e: err = str(e) if "No CUDA GPUs are available" in err: logger.warning("ZeroGPU unavailable for this request; using metadata fallback.") yield _metadata_fallback_reply(message) return raise # ══════════════════════════════════════════════════════════════ # SEARCH TAB (CPU only) # ══════════════════════════════════════════════════════════════ def filter_and_search(query, agency, category, sector): """Filter + hybrid retrieval. Returns DataFrame for display. CPU only.""" if metadata_df.empty: empty_results = pd.DataFrame(columns=[ "Title", "Lead Agency", "Location", "Category", "Sector", "# Docs", "Document Types" ]) return empty_results, "Metadata is still loading or unavailable. Please retry shortly." df = metadata_df.copy() if agency and agency != "All": df = df[df["lead_agency"] == agency] if category and category != "All": df = df[df["nepa_category"] == category] if sector and sector != "All": df = df[df["project_sector"] == sector] filtered_pool_size = len(df) semantic_used = False if query and query.strip(): terms = _extract_query_terms(query) if hybrid_retriever is not None and hybrid_retriever.enabled: semantic_used = True df = hybrid_retriever.search( query=query, df_scope=df, terms=terms, top_k=min(filtered_pool_size, 1000), use_reranker=False, ) else: df = _keyword_filter(df, terms) display_cols = [ "project_title", "lead_agency", "location", "nepa_category", "project_sector", "num_documents", "document_types" ] for col in display_cols: if col not in df.columns: df[col] = "" result = df[display_cols].head(200).copy() result.columns = [ "Title", "Lead Agency", "Location", "Category", "Sector", "# Docs", "Document Types" ] if query and query.strip() and semantic_used: status = ( f"Showing {len(result)} top hybrid matches " f"from {filtered_pool_size:,} filtered projects" ) else: status = f"Showing {len(result)} of {len(df):,} matching projects" return result, status # ══════════════════════════════════════════════════════════════ # GRADIO UI # ══════════════════════════════════════════════════════════════ CUSTOM_CSS = """ .gradio-container { max-width: 1400px !important; margin: auto; } #header { text-align: center; margin-bottom: 0; } #header h1 { margin-bottom: 0.2em; } #header p { margin-top: 0; color: #666; font-size: 1.1em; } #stats-row { margin: 0.5em 0; } .stat-box { text-align: center; padding: 1em; border-radius: 8px; background: linear-gradient(135deg, #f0fdf4, #dcfce7); } .stat-box h3 { margin: 0; font-size: 1.8em; color: #166534; } .stat-box p { margin: 0.2em 0 0 0; color: #4ade80; font-size: 0.85em; } """ APP_THEME = gr.themes.Soft(primary_hue="green", secondary_hue="emerald") # Build stats markdown stats_md = "" if stats: cats = stats.get("categories", {}) stats_md = ( f"**{stats['total_projects']:,}** projects · " f"**{stats.get('unique_agencies', 0)}** agencies · " f"**CE:** {cats.get('CE', 0):,} · " f"**EA:** {cats.get('EA', 0):,} · " f"**EIS:** {cats.get('EIS', 0):,}" ) def build_app(): with gr.Blocks(title="NEPA AI Assistant") as demo: # ── Header ──────────────────────────────────────────── gr.HTML( '
AI-powered search & Q&A across 60,000+ federal environmental review projects
' '