""" NEPA.tools — NEPATEC 2.0 Public Explorer Workflow-first Streamlit UI for precedent discovery. Domain: https://nepa.tools """ from __future__ import annotations import copy import hashlib import io import json import os import textwrap import urllib.error import urllib.request from datetime import datetime from itertools import islice from typing import Any import pandas as pd import streamlit as st # -------------------------------------------------- # CONFIG # -------------------------------------------------- REPO_ID = "PNNL/NEPATEC2.0" RECORD_PREFETCH_WINDOW = 8 CATEGORIES = { "CE": "Categorical Exclusions", "EA": "Environmental Assessments", "EIS": "Environmental Impact Statements", } CATEGORY_DESCRIPTIONS = { "CE": "54,668 projects · Actions exempt from detailed NEPA review", "EA": "3,083 projects · Assessments to determine if full EIS is needed", "EIS": "4,130 projects · Full environmental impact analyses", } MAX_LOAD_OPTIONS = [500, 1000, 2000, 5000, 10000, 20000] FILTER_PRESETS = { "All": { "search_text": "", "selected_agencies": [], "selected_locations": [], "selected_sectors": [], "selected_types": [], }, "Transportation": { "search_text": "transport", "selected_agencies": [], "selected_locations": [], "selected_sectors": ["Transportation"], "selected_types": [], }, "Energy": { "search_text": "energy", "selected_agencies": [], "selected_locations": [], "selected_sectors": ["Energy", "Transmission", "Power"], "selected_types": [], }, "Land Management": { "search_text": "land management", "selected_agencies": ["Bureau of Land Management", "US Forest Service"], "selected_locations": [], "selected_sectors": [], "selected_types": [], }, } # -------------------------------------------------- # PAGE CONFIG # -------------------------------------------------- st.set_page_config( page_title="NEPA.tools — NEPATEC 2.0 Explorer", page_icon="🔧", layout="wide", initial_sidebar_state="collapsed", ) # -------------------------------------------------- # APP STYLES (Theme-aware; relies on .streamlit/config.toml tokens) # -------------------------------------------------- APP_CSS = """ """ def apply_theme_css() -> None: st.markdown(APP_CSS, unsafe_allow_html=True) # -------------------------------------------------- # SESSION STATE # -------------------------------------------------- STATE_DEFAULTS: dict[str, Any] = { "selected_cat": None, "max_load": 500, "load_requested": False, "loaded_cat": None, "search_text": "", "selected_agencies": [], "selected_locations": [], "selected_sectors": [], "selected_types": [], "selected_record_idx": None, "shortlist_by_cat": {"CE": [], "EA": [], "EIS": []}, "loaded_record_idx": None, "loaded_record_data": None, "query_params_loaded": False, "filter_preset": "All", "last_applied_preset": "All", "recent_filters": [], "read_doc_search": "", "read_focus_hint": False, } def _copy_default(value: Any) -> Any: return copy.deepcopy(value) def init_session_state() -> None: for key, value in STATE_DEFAULTS.items(): if key not in st.session_state: st.session_state[key] = _copy_default(value) if st.session_state.selected_cat not in CATEGORIES: st.session_state.selected_cat = None if st.session_state.loaded_cat not in CATEGORIES: st.session_state.loaded_cat = None if not isinstance(st.session_state.load_requested, bool): st.session_state.load_requested = False if st.session_state.max_load not in MAX_LOAD_OPTIONS: st.session_state.max_load = 500 for key in [ "selected_agencies", "selected_locations", "selected_sectors", "selected_types", "recent_filters", ]: if not isinstance(st.session_state[key], list): st.session_state[key] = [] if not isinstance(st.session_state.shortlist_by_cat, dict): st.session_state.shortlist_by_cat = {"CE": [], "EA": [], "EIS": []} for cat in CATEGORIES: if cat not in st.session_state.shortlist_by_cat or not isinstance( st.session_state.shortlist_by_cat[cat], list ): st.session_state.shortlist_by_cat[cat] = [] def clear_selection_state() -> None: st.session_state.selected_record_idx = None st.session_state.loaded_record_idx = None st.session_state.loaded_record_data = None st.session_state.read_doc_search = "" st.session_state.read_focus_hint = False def clear_filter_state() -> None: st.session_state.search_text = "" st.session_state.selected_agencies = [] st.session_state.selected_locations = [] st.session_state.selected_sectors = [] st.session_state.selected_types = [] st.session_state.filter_preset = "All" st.session_state.last_applied_preset = "All" def clear_loaded_data_state() -> None: st.session_state.load_requested = False st.session_state.loaded_cat = None st.session_state._loaded_df = pd.DataFrame() def reset_session_state() -> None: for key, value in STATE_DEFAULTS.items(): st.session_state[key] = _copy_default(value) def _qp_get_all(params_obj: Any, key: str) -> list[str]: if hasattr(params_obj, "get_all"): try: vals = params_obj.get_all(key) return [str(v) for v in vals] except Exception: pass val = params_obj.get(key) if val is None: return [] if isinstance(val, list): return [str(v) for v in val] return [str(val)] def read_query_params_into_state() -> None: if st.session_state.query_params_loaded: return if not hasattr(st, "query_params"): st.session_state.query_params_loaded = True return try: qp = st.query_params cat = qp.get("cat") if cat in CATEGORIES: st.session_state.selected_cat = cat # Keep startup explicit: category can be preselected from URL, # but loading still requires an explicit button click. st.session_state.load_requested = False q = qp.get("q") if q is not None: st.session_state.search_text = str(q) max_raw = qp.get("max") if max_raw is not None: try: max_val = int(str(max_raw)) if max_val in MAX_LOAD_OPTIONS: st.session_state.max_load = max_val except ValueError: pass st.session_state.selected_agencies = _qp_get_all(qp, "agency") st.session_state.selected_locations = _qp_get_all(qp, "loc") st.session_state.selected_sectors = _qp_get_all(qp, "sector") st.session_state.selected_types = _qp_get_all(qp, "ptype") except Exception: pass st.session_state.query_params_loaded = True def _query_snapshot_from_state() -> dict[str, Any]: out: dict[str, Any] = { "cat": st.session_state.selected_cat or "", "q": st.session_state.search_text.strip(), "max": str(st.session_state.max_load), "agency": [str(v) for v in st.session_state.selected_agencies], "loc": [str(v) for v in st.session_state.selected_locations], "sector": [str(v) for v in st.session_state.selected_sectors], "ptype": [str(v) for v in st.session_state.selected_types], } return out def _query_snapshot_from_params() -> dict[str, Any]: if not hasattr(st, "query_params"): return {} qp = st.query_params return { "cat": str(qp.get("cat") or ""), "q": str(qp.get("q") or ""), "max": str(qp.get("max") or ""), "agency": _qp_get_all(qp, "agency"), "loc": _qp_get_all(qp, "loc"), "sector": _qp_get_all(qp, "sector"), "ptype": _qp_get_all(qp, "ptype"), } def write_state_to_query_params() -> None: if not hasattr(st, "query_params"): return target = _query_snapshot_from_state() current = _query_snapshot_from_params() if target == current: return try: st.query_params.clear() st.query_params["max"] = target["max"] if target["cat"]: st.query_params["cat"] = target["cat"] if target["q"]: st.query_params["q"] = target["q"] for key in ["agency", "loc", "sector", "ptype"]: if target[key]: st.query_params[key] = target[key] except Exception: pass # -------------------------------------------------- # DATA LOADING # -------------------------------------------------- @st.cache_data(show_spinner=False, ttl=3600) def load_category_index(category: str, max_records: int = 5000) -> pd.DataFrame: try: from datasets import load_dataset hf_token = os.environ.get("HF_TOKEN", None) dataset = load_dataset( REPO_ID, data_files=[f"{category}/*/*.jsonl"], split="train", streaming=True, token=hf_token, ) rows: list[dict[str, Any]] = [] for i, record in enumerate(dataset): if i >= max_records: break rows.append(extract_metadata(record, category, i)) if not rows: return pd.DataFrame() df = pd.DataFrame(rows) for col in df.select_dtypes(include=["object"]).columns: df[col] = df[col].astype(str) return df except Exception as exc: st.error(f"Error loading {category} data: {exc}") return pd.DataFrame() @st.cache_data(show_spinner=False, ttl=3600) def load_single_record(category: str, record_idx: int): try: from datasets import load_dataset if record_idx < 0: return None window_start = max(0, record_idx - (RECORD_PREFETCH_WINDOW // 2)) window = load_record_window(category, window_start, RECORD_PREFETCH_WINDOW) if record_idx in window: return window[record_idx] hf_token = os.environ.get("HF_TOKEN", None) dataset = load_dataset( REPO_ID, data_files=[f"{category}/*/*.jsonl"], split="train", streaming=True, token=hf_token, ) # Prefer dataset-native skipping when available. if hasattr(dataset, "skip") and hasattr(dataset, "take"): try: for record in dataset.skip(record_idx).take(1): return record return None except Exception: pass return next(islice(dataset, record_idx, record_idx + 1), None) except Exception as exc: st.error(f"Error loading record: {exc}") return None @st.cache_data(show_spinner=False, ttl=3600) def load_record_window(category: str, start_idx: int, window_size: int) -> dict[int, dict[str, Any]]: try: from datasets import load_dataset if start_idx < 0 or window_size <= 0: return {} hf_token = os.environ.get("HF_TOKEN", None) dataset = load_dataset( REPO_ID, data_files=[f"{category}/*/*.jsonl"], split="train", streaming=True, token=hf_token, ) records: dict[int, dict[str, Any]] = {} if hasattr(dataset, "skip") and hasattr(dataset, "take"): try: for offset, record in enumerate(dataset.skip(start_idx).take(window_size)): records[start_idx + offset] = record return records except Exception: pass for offset, record in enumerate(islice(dataset, start_idx, start_idx + window_size)): records[start_idx + offset] = record return records except Exception: return {} def extract_metadata(record: dict, category: str, idx: int) -> dict: project = record.get("project", {}) process = record.get("process", {}) documents = record.get("documents", []) def val(obj, key): value = obj.get(key, {}) if isinstance(value, dict): value = value.get("value", "") if isinstance(value, list): return ", ".join(str(item) for item in value) if value else "" return str(value) if value else "" total_pages = 0 for doc in documents: total_pages += len(doc.get("pages", [])) return { "_record_idx": idx, "_category": category, "project_id": val(project, "project_ID"), "project_title": val(project, "project_title"), "project_sector": val(project, "project_sector"), "project_type": val(project, "project_type"), "project_description": val(project, "project_description"), "project_sponsor": val(project, "project_sponsor"), "location": val(project, "location"), "lead_agency": val(process, "lead_agency"), "process_family": val(process, "process_family"), "process_type": val(process, "process_type"), "doc_count": len(documents), "total_pages": total_pages, } def get_unique_sorted(series: pd.Series) -> list[str]: vals = series.dropna().astype(str).str.strip() vals = vals[(vals != "") & (vals != "None")] return sorted(vals.unique().tolist()) def export_metadata_csv(df: pd.DataFrame) -> bytes: export_cols = [ "project_id", "project_title", "lead_agency", "location", "project_sector", "project_type", "project_sponsor", "process_type", "doc_count", "total_pages", "_category", ] cols = [c for c in export_cols if c in df.columns] return df[cols].to_csv(index=False).encode("utf-8") # -------------------------------------------------- # STATE + FILTER HELPERS # -------------------------------------------------- def build_filter_options(df: pd.DataFrame) -> dict[str, list[str]]: return { "selected_agencies": get_unique_sorted(df["lead_agency"]), "selected_locations": get_unique_sorted(df["location"])[:300], "selected_sectors": get_unique_sorted(df["project_sector"]), "selected_types": get_unique_sorted(df["project_type"])[:150], } def sanitize_filter_state(options: dict[str, list[str]]) -> None: for state_key, available in options.items(): current = st.session_state.get(state_key, []) st.session_state[state_key] = [val for val in current if val in available] def apply_filters(df: pd.DataFrame) -> pd.DataFrame: filtered = df.copy() query = st.session_state.search_text.strip() if query: mask = ( filtered["project_title"].str.contains(query, case=False, na=False) | filtered["project_description"].str.contains(query, case=False, na=False) | filtered["location"].str.contains(query, case=False, na=False) | filtered["lead_agency"].str.contains(query, case=False, na=False) ) filtered = filtered[mask] if st.session_state.selected_agencies: filtered = filtered[filtered["lead_agency"].isin(st.session_state.selected_agencies)] if st.session_state.selected_locations: filtered = filtered[filtered["location"].isin(st.session_state.selected_locations)] if st.session_state.selected_sectors: filtered = filtered[filtered["project_sector"].isin(st.session_state.selected_sectors)] if st.session_state.selected_types: filtered = filtered[filtered["project_type"].isin(st.session_state.selected_types)] return filtered def apply_filter_preset(preset_name: str, options: dict[str, list[str]]) -> None: preset = FILTER_PRESETS.get(preset_name, FILTER_PRESETS["All"]) st.session_state.search_text = preset.get("search_text", "") for key in [ "selected_agencies", "selected_locations", "selected_sectors", "selected_types", ]: requested_vals = preset.get(key, []) allowed_vals = options.get(key, []) st.session_state[key] = [val for val in requested_vals if val in allowed_vals] def save_current_filters_as_recent() -> None: label_query = st.session_state.search_text.strip() or "filtered" cat_label = st.session_state.selected_cat or "No category" label = f"{cat_label} · {label_query[:22]}" snapshot = { "id": datetime.now().strftime("%H%M%S%f"), "label": label, "selected_cat": st.session_state.selected_cat, "search_text": st.session_state.search_text, "selected_agencies": list(st.session_state.selected_agencies), "selected_locations": list(st.session_state.selected_locations), "selected_sectors": list(st.session_state.selected_sectors), "selected_types": list(st.session_state.selected_types), } existing = [f for f in st.session_state.recent_filters if f.get("label") != label] st.session_state.recent_filters = [snapshot] + existing[:2] def apply_recent_filter(recent: dict[str, Any], options: dict[str, list[str]]) -> None: target_cat = recent.get("selected_cat") cat_switch = target_cat in CATEGORIES and target_cat != st.session_state.selected_cat if target_cat in CATEGORIES: st.session_state.selected_cat = target_cat st.session_state.search_text = str(recent.get("search_text", "")) if cat_switch: clear_selection_state() clear_loaded_data_state() for key in [ "selected_agencies", "selected_locations", "selected_sectors", "selected_types", ]: requested_vals = [str(v) for v in recent.get(key, [])] if cat_switch: # Let the next rerun sanitize these values against the new category options. st.session_state[key] = requested_vals else: allowed_vals = options.get(key, []) st.session_state[key] = [val for val in requested_vals if val in allowed_vals] # -------------------------------------------------- # ROW + SHORTLIST HELPERS # -------------------------------------------------- def get_loaded_df() -> pd.DataFrame: loaded = st.session_state.get("_loaded_df") if isinstance(loaded, pd.DataFrame): return loaded return pd.DataFrame() def get_row_by_record_idx(df: pd.DataFrame, record_idx: int | None) -> pd.Series | None: if record_idx is None or df.empty: return None match = df[df["_record_idx"] == int(record_idx)] if match.empty: return None return match.iloc[0] def selected_row_in_filtered(filtered: pd.DataFrame) -> pd.Series | None: return get_row_by_record_idx(filtered, st.session_state.selected_record_idx) def shortlist_for_current_category() -> list[int]: selected_cat = st.session_state.selected_cat return st.session_state.shortlist_by_cat.get(selected_cat, []) def add_selected_to_shortlist() -> None: selected_idx = st.session_state.selected_record_idx if selected_idx is None: return selected_cat = st.session_state.selected_cat shortlist = st.session_state.shortlist_by_cat[selected_cat] if selected_idx not in shortlist: shortlist.append(int(selected_idx)) def remove_selected_from_shortlist() -> None: selected_idx = st.session_state.selected_record_idx if selected_idx is None: return selected_cat = st.session_state.selected_cat shortlist = st.session_state.shortlist_by_cat[selected_cat] if selected_idx in shortlist: shortlist.remove(selected_idx) def shortlist_df() -> pd.DataFrame: df = get_loaded_df() ids = shortlist_for_current_category() if df.empty or not ids: return pd.DataFrame() out = df[df["_record_idx"].isin(ids)].copy() if out.empty: return out order_map = {idx: i for i, idx in enumerate(ids)} out["_sort_order"] = out["_record_idx"].map(order_map) out = out.sort_values("_sort_order").drop(columns=["_sort_order"]) return out def compute_common_attributes(df: pd.DataFrame) -> dict[str, str]: if df.empty: return {} common: dict[str, str] = {} mappings = { "lead_agency": "Agency", "location": "Location", "project_sector": "Sector", } for col, label in mappings.items(): vals = set( str(v).strip() for v in df[col].dropna().tolist() if str(v).strip() and str(v).strip() != "None" ) if len(vals) == 1: common[label] = next(iter(vals)) return common def parse_selected_rows(event_obj: Any) -> list[int]: if event_obj is None: return [] try: selection = event_obj.selection if isinstance(selection, dict): rows = selection.get("rows", []) return [int(x) for x in rows] rows = getattr(selection, "rows", []) return [int(x) for x in rows] except Exception: pass try: selection = event_obj.get("selection", {}) rows = selection.get("rows", []) return [int(x) for x in rows] except Exception: return [] def sanitize_filename(text: str, max_len: int = 60) -> str: clean = "".join(ch if ch.isalnum() or ch in {"-", "_", " ", "."} else "-" for ch in text) clean = " ".join(clean.split()).strip() if not clean: clean = "document" return clean[:max_len].replace(" ", "_") def _extract_url_candidate(value: Any) -> str | None: if isinstance(value, dict): value = value.get("value", "") if isinstance(value, list): for item in value: candidate = _extract_url_candidate(item) if candidate: return candidate return None candidate = str(value).strip() if value is not None else "" if candidate.lower().startswith(("http://", "https://")): return candidate return None def _doc_source_pdf_url(doc: dict[str, Any]) -> str | None: metadata = doc.get("metadata", {}) file_meta = metadata.get("file_metadata", {}) doc_meta = metadata.get("document_metadata", {}) url_fields = ["file_url", "source_url", "url", "download_url", "document_url"] for source in [file_meta, doc_meta]: if not isinstance(source, dict): continue for field in url_fields: candidate = _extract_url_candidate(source.get(field)) if candidate: return candidate return None def _document_pages(doc: dict[str, Any]) -> list[dict[str, Any]]: raw_pages = doc.get("pages", []) normalized: list[dict[str, Any]] = [] for i, page in enumerate(raw_pages): item = page if isinstance(page, dict) else {} page_number = item.get("page number", i + 1) try: page_number = int(page_number) except (TypeError, ValueError): page_number = i + 1 page_text = item.get("page text", "") normalized.append( { "page number": page_number, "page text": "" if page_text is None else str(page_text), } ) return normalized def _pages_tuple_and_hash(pages: list[dict[str, Any]]) -> tuple[tuple[tuple[int, str], ...], str]: pages_tuple = tuple( ( int(page.get("page number", i + 1)), str(page.get("page text", "")), ) for i, page in enumerate(pages) ) compact = json.dumps(pages_tuple, ensure_ascii=False, separators=(",", ":")) pages_hash = hashlib.sha256(compact.encode("utf-8")).hexdigest() return pages_tuple, pages_hash def build_txt_bytes(title: str, pages: list[dict[str, Any]]) -> bytes: _ = title full_text = "\n\n".join( f"[Page {p.get('page number', '?')}]\n{p.get('page text', '')}" for p in pages ) return full_text.encode("utf-8") def build_md_bytes(title: str, pages: list[dict[str, Any]]) -> bytes: lines = [f"# {title}", ""] if not pages: lines.append("_No extracted page text available._") else: for page in pages: lines.append(f"## Page {page.get('page number', '?')}") lines.append(str(page.get("page text", ""))) lines.append("") return "\n".join(lines).strip().encode("utf-8") def build_json_bytes(title: str, doc: dict[str, Any], pages: list[dict[str, Any]]) -> bytes: metadata = doc.get("metadata", {}) payload = { "title": title, "document_metadata": metadata.get("document_metadata", {}), "file_metadata": metadata.get("file_metadata", {}), "page_count": len(pages), "pages": [ { "page_number": page.get("page number", i + 1), "text": page.get("page text", ""), } for i, page in enumerate(pages) ], } return json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8") def _pdf_safe_text(value: str) -> str: """Normalize common Unicode punctuation for Latin-1 PDF output.""" replacements = { "\u2018": "'", "\u2019": "'", "\u201c": '"', "\u201d": '"', "\u2013": "-", "\u2014": "-", "\u2026": "...", "\u2022": "*", "\u00a0": " ", } out = str(value) for old, new in replacements.items(): out = out.replace(old, new) out = out.replace("\r\n", "\n").replace("\r", "\n") out = "".join(ch if (ch == "\n" or ch == "\t" or ord(ch) >= 32) else " " for ch in out) out = out.replace("\t", " ") return out.encode("latin-1", errors="replace").decode("latin-1") def _pdf_safe_wrapped_lines(value: str, width: int = 110) -> list[str]: """Pre-wrap text for predictable PDF rendering.""" safe = _pdf_safe_text(value) out: list[str] = [] for paragraph in safe.split("\n"): if not paragraph.strip(): out.append("") continue wrapped = textwrap.wrap( paragraph, width=width, break_long_words=True, break_on_hyphens=False, replace_whitespace=False, drop_whitespace=False, ) out.extend(wrapped if wrapped else [""]) return out def _reportlab_canvas() -> tuple[Any, io.BytesIO, float, float]: from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas buffer = io.BytesIO() c = canvas.Canvas(buffer, pagesize=letter) page_width, page_height = letter return c, buffer, page_width, page_height @st.cache_data(show_spinner=False, ttl=3600) def build_text_pdf_bytes( title: str, pages_hash: str, pages_tuple: tuple[tuple[int, str], ...] ) -> bytes: _ = pages_hash c, buffer, _page_width, page_height = _reportlab_canvas() left_margin = 54 top_margin = 54 bottom_margin = 54 line_height = 12 y = page_height - top_margin def ensure_space(lines_needed: int = 1) -> None: nonlocal y needed = lines_needed * line_height if y - needed < bottom_margin: c.showPage() y = page_height - top_margin c.setFont("Helvetica-Bold", 14) for line in _pdf_safe_wrapped_lines(title or "Document", width=92): ensure_space(1) c.drawString(left_margin, y, line if line else " ") y -= line_height y -= 6 if not pages_tuple: c.setFont("Helvetica", 10) ensure_space(1) c.drawString(left_margin, y, "No extracted page text available.") else: for page_number, page_text in pages_tuple: c.setFont("Helvetica-Bold", 11) ensure_space(1) c.drawString(left_margin, y, _pdf_safe_text(f"Page {page_number}")) y -= line_height c.setFont("Helvetica", 10) for line in _pdf_safe_wrapped_lines(page_text or "(no text)", width=110): ensure_space(1) c.drawString(left_margin, y, line if line else " ") y -= line_height y -= 6 c.save() return buffer.getvalue() @st.cache_data(show_spinner=False, ttl=3600) def build_pdf_unavailable_stub_bytes(title: str, reason: str) -> bytes: """Last-resort minimal PDF so the user still gets a valid PDF file.""" c, buffer, _page_width, page_height = _reportlab_canvas() left_margin = 54 top_margin = 54 line_height = 12 y = page_height - top_margin c.setFont("Helvetica-Bold", 14) c.drawString(left_margin, y, _pdf_safe_text(title or "Document")) y -= (line_height + 4) c.setFont("Helvetica", 10) c.drawString( left_margin, y, "PDF content could not be fully rendered from extracted text.", ) y -= line_height c.drawString(left_margin, y, _pdf_safe_text(f"Reason: {reason}")) c.save() return buffer.getvalue() @st.cache_data(show_spinner=False, ttl=3600) def fetch_source_pdf_bytes(url: str) -> bytes | None: try: req = urllib.request.Request(url, headers={"User-Agent": "NEPA.tools/1.0"}) with urllib.request.urlopen(req, timeout=12) as response: data = response.read() content_type = str(response.headers.get("Content-Type", "")).lower() if not data: return None if "pdf" in content_type or data.startswith(b"%PDF"): return data except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, ValueError): return None except Exception: return None return None def resolve_pdf_bytes( title: str, doc: dict[str, Any], pages: list[dict[str, Any]] ) -> tuple[bytes, bool]: source_url = _doc_source_pdf_url(doc) if source_url: source_pdf_bytes = fetch_source_pdf_bytes(source_url) if source_pdf_bytes: return source_pdf_bytes, True pages_tuple, pages_hash = _pages_tuple_and_hash(pages) try: generated_pdf = build_text_pdf_bytes(title, pages_hash, pages_tuple) return generated_pdf, False except Exception as exc: reason = f"{type(exc).__name__}" stub_pdf = build_pdf_unavailable_stub_bytes(title, reason) return stub_pdf, False @st.cache_data(show_spinner=False) def load_header_logo_svg() -> str: logo_path = os.path.join(os.path.dirname(__file__), "assets", "nepa_logo_bw.svg") try: with open(logo_path, "r", encoding="utf-8") as f: svg = f.read() except OSError: return "" # Remove XML declaration for HTML embedding in markdown. if svg.lstrip().startswith("", 1)[-1].lstrip() # Crop this specific asset to the horizontal lockup so it renders at header scale. if 'viewBox="0 0 1024 1024"' in svg: svg = svg.replace( 'viewBox="0 0 1024 1024"', 'viewBox="140 420 745 190" role="img" aria-label="NEPA.tools logo"', ) # Keep colors theme-aware; first black path is the frame/border. svg = svg.replace('fill="#000000"', 'fill="var(--nepa-logo-border)"', 1) svg = svg.replace('fill="#000000"', 'fill="var(--nepa-logo-ink)"') svg = svg.replace('stroke="#000000"', 'stroke="var(--nepa-logo-stroke)"') return svg def render_header() -> None: logo_svg = load_header_logo_svg() if logo_svg: st.markdown( f"""
NEPA.tools
', unsafe_allow_html=True) st.markdown( '' "Analyze federal NEPA precedents faster: filter, shortlist, compare, and review full text " "across 120,000+ documents from 60+ agencies." "
", unsafe_allow_html=True, ) st.markdown( '", unsafe_allow_html=True, ) def render_intro_section() -> None: st.markdown( """NEPA.tools helps you identify comparable federal NEPA precedents across CE, EA, and EIS records. Select a category, load a manageable subset, then refine, review, and export results.
The app starts with 500 records by default to stay responsive. Increase data scope only when you need broader coverage.
Select a row below to inspect project metadata and manage your shortlist.
', unsafe_allow_html=True, ) st.caption(f"Data scope: {len(loaded_df):,} loaded records") def render_selected_project_panel(filtered: pd.DataFrame) -> None: row = selected_row_in_filtered(filtered) if row is None: st.markdown( """