""" NEPA.tools — NEPATEC 2.0 Public Explorer Workflow-first Streamlit UI for precedent discovery. Domain: https://nepa.tools """ from __future__ import annotations import copy import hashlib import io import json import os import textwrap import urllib.error import urllib.request from datetime import datetime from itertools import islice from typing import Any import pandas as pd import streamlit as st # -------------------------------------------------- # CONFIG # -------------------------------------------------- REPO_ID = "PNNL/NEPATEC2.0" RECORD_PREFETCH_WINDOW = 8 CATEGORIES = { "CE": "Categorical Exclusions", "EA": "Environmental Assessments", "EIS": "Environmental Impact Statements", } CATEGORY_DESCRIPTIONS = { "CE": "54,668 projects · Actions exempt from detailed NEPA review", "EA": "3,083 projects · Assessments to determine if full EIS is needed", "EIS": "4,130 projects · Full environmental impact analyses", } MAX_LOAD_OPTIONS = [500, 1000, 2000, 5000, 10000, 20000] FILTER_PRESETS = { "All": { "search_text": "", "selected_agencies": [], "selected_locations": [], "selected_sectors": [], "selected_types": [], }, "Transportation": { "search_text": "transport", "selected_agencies": [], "selected_locations": [], "selected_sectors": ["Transportation"], "selected_types": [], }, "Energy": { "search_text": "energy", "selected_agencies": [], "selected_locations": [], "selected_sectors": ["Energy", "Transmission", "Power"], "selected_types": [], }, "Land Management": { "search_text": "land management", "selected_agencies": ["Bureau of Land Management", "US Forest Service"], "selected_locations": [], "selected_sectors": [], "selected_types": [], }, } # -------------------------------------------------- # PAGE CONFIG # -------------------------------------------------- st.set_page_config( page_title="NEPA.tools — NEPATEC 2.0 Explorer", page_icon="🔧", layout="wide", initial_sidebar_state="collapsed", ) # -------------------------------------------------- # APP STYLES (Theme-aware; relies on .streamlit/config.toml tokens) # -------------------------------------------------- APP_CSS = """ """ def apply_theme_css() -> None: st.markdown(APP_CSS, unsafe_allow_html=True) # -------------------------------------------------- # SESSION STATE # -------------------------------------------------- STATE_DEFAULTS: dict[str, Any] = { "selected_cat": None, "max_load": 500, "load_requested": False, "loaded_cat": None, "search_text": "", "selected_agencies": [], "selected_locations": [], "selected_sectors": [], "selected_types": [], "selected_record_idx": None, "shortlist_by_cat": {"CE": [], "EA": [], "EIS": []}, "loaded_record_idx": None, "loaded_record_data": None, "query_params_loaded": False, "filter_preset": "All", "last_applied_preset": "All", "recent_filters": [], "read_doc_search": "", "read_focus_hint": False, } def _copy_default(value: Any) -> Any: return copy.deepcopy(value) def init_session_state() -> None: for key, value in STATE_DEFAULTS.items(): if key not in st.session_state: st.session_state[key] = _copy_default(value) if st.session_state.selected_cat not in CATEGORIES: st.session_state.selected_cat = None if st.session_state.loaded_cat not in CATEGORIES: st.session_state.loaded_cat = None if not isinstance(st.session_state.load_requested, bool): st.session_state.load_requested = False if st.session_state.max_load not in MAX_LOAD_OPTIONS: st.session_state.max_load = 500 for key in [ "selected_agencies", "selected_locations", "selected_sectors", "selected_types", "recent_filters", ]: if not isinstance(st.session_state[key], list): st.session_state[key] = [] if not isinstance(st.session_state.shortlist_by_cat, dict): st.session_state.shortlist_by_cat = {"CE": [], "EA": [], "EIS": []} for cat in CATEGORIES: if cat not in st.session_state.shortlist_by_cat or not isinstance( st.session_state.shortlist_by_cat[cat], list ): st.session_state.shortlist_by_cat[cat] = [] def clear_selection_state() -> None: st.session_state.selected_record_idx = None st.session_state.loaded_record_idx = None st.session_state.loaded_record_data = None st.session_state.read_doc_search = "" st.session_state.read_focus_hint = False def clear_filter_state() -> None: st.session_state.search_text = "" st.session_state.selected_agencies = [] st.session_state.selected_locations = [] st.session_state.selected_sectors = [] st.session_state.selected_types = [] st.session_state.filter_preset = "All" st.session_state.last_applied_preset = "All" def clear_loaded_data_state() -> None: st.session_state.load_requested = False st.session_state.loaded_cat = None st.session_state._loaded_df = pd.DataFrame() def reset_session_state() -> None: for key, value in STATE_DEFAULTS.items(): st.session_state[key] = _copy_default(value) def _qp_get_all(params_obj: Any, key: str) -> list[str]: if hasattr(params_obj, "get_all"): try: vals = params_obj.get_all(key) return [str(v) for v in vals] except Exception: pass val = params_obj.get(key) if val is None: return [] if isinstance(val, list): return [str(v) for v in val] return [str(val)] def read_query_params_into_state() -> None: if st.session_state.query_params_loaded: return if not hasattr(st, "query_params"): st.session_state.query_params_loaded = True return try: qp = st.query_params cat = qp.get("cat") if cat in CATEGORIES: st.session_state.selected_cat = cat # Keep startup explicit: category can be preselected from URL, # but loading still requires an explicit button click. st.session_state.load_requested = False q = qp.get("q") if q is not None: st.session_state.search_text = str(q) max_raw = qp.get("max") if max_raw is not None: try: max_val = int(str(max_raw)) if max_val in MAX_LOAD_OPTIONS: st.session_state.max_load = max_val except ValueError: pass st.session_state.selected_agencies = _qp_get_all(qp, "agency") st.session_state.selected_locations = _qp_get_all(qp, "loc") st.session_state.selected_sectors = _qp_get_all(qp, "sector") st.session_state.selected_types = _qp_get_all(qp, "ptype") except Exception: pass st.session_state.query_params_loaded = True def _query_snapshot_from_state() -> dict[str, Any]: out: dict[str, Any] = { "cat": st.session_state.selected_cat or "", "q": st.session_state.search_text.strip(), "max": str(st.session_state.max_load), "agency": [str(v) for v in st.session_state.selected_agencies], "loc": [str(v) for v in st.session_state.selected_locations], "sector": [str(v) for v in st.session_state.selected_sectors], "ptype": [str(v) for v in st.session_state.selected_types], } return out def _query_snapshot_from_params() -> dict[str, Any]: if not hasattr(st, "query_params"): return {} qp = st.query_params return { "cat": str(qp.get("cat") or ""), "q": str(qp.get("q") or ""), "max": str(qp.get("max") or ""), "agency": _qp_get_all(qp, "agency"), "loc": _qp_get_all(qp, "loc"), "sector": _qp_get_all(qp, "sector"), "ptype": _qp_get_all(qp, "ptype"), } def write_state_to_query_params() -> None: if not hasattr(st, "query_params"): return target = _query_snapshot_from_state() current = _query_snapshot_from_params() if target == current: return try: st.query_params.clear() st.query_params["max"] = target["max"] if target["cat"]: st.query_params["cat"] = target["cat"] if target["q"]: st.query_params["q"] = target["q"] for key in ["agency", "loc", "sector", "ptype"]: if target[key]: st.query_params[key] = target[key] except Exception: pass # -------------------------------------------------- # DATA LOADING # -------------------------------------------------- @st.cache_data(show_spinner=False, ttl=3600) def load_category_index(category: str, max_records: int = 5000) -> pd.DataFrame: try: from datasets import load_dataset hf_token = os.environ.get("HF_TOKEN", None) dataset = load_dataset( REPO_ID, data_files=[f"{category}/*/*.jsonl"], split="train", streaming=True, token=hf_token, ) rows: list[dict[str, Any]] = [] for i, record in enumerate(dataset): if i >= max_records: break rows.append(extract_metadata(record, category, i)) if not rows: return pd.DataFrame() df = pd.DataFrame(rows) for col in df.select_dtypes(include=["object"]).columns: df[col] = df[col].astype(str) return df except Exception as exc: st.error(f"Error loading {category} data: {exc}") return pd.DataFrame() @st.cache_data(show_spinner=False, ttl=3600) def load_single_record(category: str, record_idx: int): try: from datasets import load_dataset if record_idx < 0: return None window_start = max(0, record_idx - (RECORD_PREFETCH_WINDOW // 2)) window = load_record_window(category, window_start, RECORD_PREFETCH_WINDOW) if record_idx in window: return window[record_idx] hf_token = os.environ.get("HF_TOKEN", None) dataset = load_dataset( REPO_ID, data_files=[f"{category}/*/*.jsonl"], split="train", streaming=True, token=hf_token, ) # Prefer dataset-native skipping when available. if hasattr(dataset, "skip") and hasattr(dataset, "take"): try: for record in dataset.skip(record_idx).take(1): return record return None except Exception: pass return next(islice(dataset, record_idx, record_idx + 1), None) except Exception as exc: st.error(f"Error loading record: {exc}") return None @st.cache_data(show_spinner=False, ttl=3600) def load_record_window(category: str, start_idx: int, window_size: int) -> dict[int, dict[str, Any]]: try: from datasets import load_dataset if start_idx < 0 or window_size <= 0: return {} hf_token = os.environ.get("HF_TOKEN", None) dataset = load_dataset( REPO_ID, data_files=[f"{category}/*/*.jsonl"], split="train", streaming=True, token=hf_token, ) records: dict[int, dict[str, Any]] = {} if hasattr(dataset, "skip") and hasattr(dataset, "take"): try: for offset, record in enumerate(dataset.skip(start_idx).take(window_size)): records[start_idx + offset] = record return records except Exception: pass for offset, record in enumerate(islice(dataset, start_idx, start_idx + window_size)): records[start_idx + offset] = record return records except Exception: return {} def extract_metadata(record: dict, category: str, idx: int) -> dict: project = record.get("project", {}) process = record.get("process", {}) documents = record.get("documents", []) def val(obj, key): value = obj.get(key, {}) if isinstance(value, dict): value = value.get("value", "") if isinstance(value, list): return ", ".join(str(item) for item in value) if value else "" return str(value) if value else "" total_pages = 0 for doc in documents: total_pages += len(doc.get("pages", [])) return { "_record_idx": idx, "_category": category, "project_id": val(project, "project_ID"), "project_title": val(project, "project_title"), "project_sector": val(project, "project_sector"), "project_type": val(project, "project_type"), "project_description": val(project, "project_description"), "project_sponsor": val(project, "project_sponsor"), "location": val(project, "location"), "lead_agency": val(process, "lead_agency"), "process_family": val(process, "process_family"), "process_type": val(process, "process_type"), "doc_count": len(documents), "total_pages": total_pages, } def get_unique_sorted(series: pd.Series) -> list[str]: vals = series.dropna().astype(str).str.strip() vals = vals[(vals != "") & (vals != "None")] return sorted(vals.unique().tolist()) def export_metadata_csv(df: pd.DataFrame) -> bytes: export_cols = [ "project_id", "project_title", "lead_agency", "location", "project_sector", "project_type", "project_sponsor", "process_type", "doc_count", "total_pages", "_category", ] cols = [c for c in export_cols if c in df.columns] return df[cols].to_csv(index=False).encode("utf-8") # -------------------------------------------------- # STATE + FILTER HELPERS # -------------------------------------------------- def build_filter_options(df: pd.DataFrame) -> dict[str, list[str]]: return { "selected_agencies": get_unique_sorted(df["lead_agency"]), "selected_locations": get_unique_sorted(df["location"])[:300], "selected_sectors": get_unique_sorted(df["project_sector"]), "selected_types": get_unique_sorted(df["project_type"])[:150], } def sanitize_filter_state(options: dict[str, list[str]]) -> None: for state_key, available in options.items(): current = st.session_state.get(state_key, []) st.session_state[state_key] = [val for val in current if val in available] def apply_filters(df: pd.DataFrame) -> pd.DataFrame: filtered = df.copy() query = st.session_state.search_text.strip() if query: mask = ( filtered["project_title"].str.contains(query, case=False, na=False) | filtered["project_description"].str.contains(query, case=False, na=False) | filtered["location"].str.contains(query, case=False, na=False) | filtered["lead_agency"].str.contains(query, case=False, na=False) ) filtered = filtered[mask] if st.session_state.selected_agencies: filtered = filtered[filtered["lead_agency"].isin(st.session_state.selected_agencies)] if st.session_state.selected_locations: filtered = filtered[filtered["location"].isin(st.session_state.selected_locations)] if st.session_state.selected_sectors: filtered = filtered[filtered["project_sector"].isin(st.session_state.selected_sectors)] if st.session_state.selected_types: filtered = filtered[filtered["project_type"].isin(st.session_state.selected_types)] return filtered def apply_filter_preset(preset_name: str, options: dict[str, list[str]]) -> None: preset = FILTER_PRESETS.get(preset_name, FILTER_PRESETS["All"]) st.session_state.search_text = preset.get("search_text", "") for key in [ "selected_agencies", "selected_locations", "selected_sectors", "selected_types", ]: requested_vals = preset.get(key, []) allowed_vals = options.get(key, []) st.session_state[key] = [val for val in requested_vals if val in allowed_vals] def save_current_filters_as_recent() -> None: label_query = st.session_state.search_text.strip() or "filtered" cat_label = st.session_state.selected_cat or "No category" label = f"{cat_label} · {label_query[:22]}" snapshot = { "id": datetime.now().strftime("%H%M%S%f"), "label": label, "selected_cat": st.session_state.selected_cat, "search_text": st.session_state.search_text, "selected_agencies": list(st.session_state.selected_agencies), "selected_locations": list(st.session_state.selected_locations), "selected_sectors": list(st.session_state.selected_sectors), "selected_types": list(st.session_state.selected_types), } existing = [f for f in st.session_state.recent_filters if f.get("label") != label] st.session_state.recent_filters = [snapshot] + existing[:2] def apply_recent_filter(recent: dict[str, Any], options: dict[str, list[str]]) -> None: target_cat = recent.get("selected_cat") cat_switch = target_cat in CATEGORIES and target_cat != st.session_state.selected_cat if target_cat in CATEGORIES: st.session_state.selected_cat = target_cat st.session_state.search_text = str(recent.get("search_text", "")) if cat_switch: clear_selection_state() clear_loaded_data_state() for key in [ "selected_agencies", "selected_locations", "selected_sectors", "selected_types", ]: requested_vals = [str(v) for v in recent.get(key, [])] if cat_switch: # Let the next rerun sanitize these values against the new category options. st.session_state[key] = requested_vals else: allowed_vals = options.get(key, []) st.session_state[key] = [val for val in requested_vals if val in allowed_vals] # -------------------------------------------------- # ROW + SHORTLIST HELPERS # -------------------------------------------------- def get_loaded_df() -> pd.DataFrame: loaded = st.session_state.get("_loaded_df") if isinstance(loaded, pd.DataFrame): return loaded return pd.DataFrame() def get_row_by_record_idx(df: pd.DataFrame, record_idx: int | None) -> pd.Series | None: if record_idx is None or df.empty: return None match = df[df["_record_idx"] == int(record_idx)] if match.empty: return None return match.iloc[0] def selected_row_in_filtered(filtered: pd.DataFrame) -> pd.Series | None: return get_row_by_record_idx(filtered, st.session_state.selected_record_idx) def shortlist_for_current_category() -> list[int]: selected_cat = st.session_state.selected_cat return st.session_state.shortlist_by_cat.get(selected_cat, []) def add_selected_to_shortlist() -> None: selected_idx = st.session_state.selected_record_idx if selected_idx is None: return selected_cat = st.session_state.selected_cat shortlist = st.session_state.shortlist_by_cat[selected_cat] if selected_idx not in shortlist: shortlist.append(int(selected_idx)) def remove_selected_from_shortlist() -> None: selected_idx = st.session_state.selected_record_idx if selected_idx is None: return selected_cat = st.session_state.selected_cat shortlist = st.session_state.shortlist_by_cat[selected_cat] if selected_idx in shortlist: shortlist.remove(selected_idx) def shortlist_df() -> pd.DataFrame: df = get_loaded_df() ids = shortlist_for_current_category() if df.empty or not ids: return pd.DataFrame() out = df[df["_record_idx"].isin(ids)].copy() if out.empty: return out order_map = {idx: i for i, idx in enumerate(ids)} out["_sort_order"] = out["_record_idx"].map(order_map) out = out.sort_values("_sort_order").drop(columns=["_sort_order"]) return out def compute_common_attributes(df: pd.DataFrame) -> dict[str, str]: if df.empty: return {} common: dict[str, str] = {} mappings = { "lead_agency": "Agency", "location": "Location", "project_sector": "Sector", } for col, label in mappings.items(): vals = set( str(v).strip() for v in df[col].dropna().tolist() if str(v).strip() and str(v).strip() != "None" ) if len(vals) == 1: common[label] = next(iter(vals)) return common def parse_selected_rows(event_obj: Any) -> list[int]: if event_obj is None: return [] try: selection = event_obj.selection if isinstance(selection, dict): rows = selection.get("rows", []) return [int(x) for x in rows] rows = getattr(selection, "rows", []) return [int(x) for x in rows] except Exception: pass try: selection = event_obj.get("selection", {}) rows = selection.get("rows", []) return [int(x) for x in rows] except Exception: return [] def sanitize_filename(text: str, max_len: int = 60) -> str: clean = "".join(ch if ch.isalnum() or ch in {"-", "_", " ", "."} else "-" for ch in text) clean = " ".join(clean.split()).strip() if not clean: clean = "document" return clean[:max_len].replace(" ", "_") def _extract_url_candidate(value: Any) -> str | None: if isinstance(value, dict): value = value.get("value", "") if isinstance(value, list): for item in value: candidate = _extract_url_candidate(item) if candidate: return candidate return None candidate = str(value).strip() if value is not None else "" if candidate.lower().startswith(("http://", "https://")): return candidate return None def _doc_source_pdf_url(doc: dict[str, Any]) -> str | None: metadata = doc.get("metadata", {}) file_meta = metadata.get("file_metadata", {}) doc_meta = metadata.get("document_metadata", {}) url_fields = ["file_url", "source_url", "url", "download_url", "document_url"] for source in [file_meta, doc_meta]: if not isinstance(source, dict): continue for field in url_fields: candidate = _extract_url_candidate(source.get(field)) if candidate: return candidate return None def _document_pages(doc: dict[str, Any]) -> list[dict[str, Any]]: raw_pages = doc.get("pages", []) normalized: list[dict[str, Any]] = [] for i, page in enumerate(raw_pages): item = page if isinstance(page, dict) else {} page_number = item.get("page number", i + 1) try: page_number = int(page_number) except (TypeError, ValueError): page_number = i + 1 page_text = item.get("page text", "") normalized.append( { "page number": page_number, "page text": "" if page_text is None else str(page_text), } ) return normalized def _pages_tuple_and_hash(pages: list[dict[str, Any]]) -> tuple[tuple[tuple[int, str], ...], str]: pages_tuple = tuple( ( int(page.get("page number", i + 1)), str(page.get("page text", "")), ) for i, page in enumerate(pages) ) compact = json.dumps(pages_tuple, ensure_ascii=False, separators=(",", ":")) pages_hash = hashlib.sha256(compact.encode("utf-8")).hexdigest() return pages_tuple, pages_hash def build_txt_bytes(title: str, pages: list[dict[str, Any]]) -> bytes: _ = title full_text = "\n\n".join( f"[Page {p.get('page number', '?')}]\n{p.get('page text', '')}" for p in pages ) return full_text.encode("utf-8") def build_md_bytes(title: str, pages: list[dict[str, Any]]) -> bytes: lines = [f"# {title}", ""] if not pages: lines.append("_No extracted page text available._") else: for page in pages: lines.append(f"## Page {page.get('page number', '?')}") lines.append(str(page.get("page text", ""))) lines.append("") return "\n".join(lines).strip().encode("utf-8") def build_json_bytes(title: str, doc: dict[str, Any], pages: list[dict[str, Any]]) -> bytes: metadata = doc.get("metadata", {}) payload = { "title": title, "document_metadata": metadata.get("document_metadata", {}), "file_metadata": metadata.get("file_metadata", {}), "page_count": len(pages), "pages": [ { "page_number": page.get("page number", i + 1), "text": page.get("page text", ""), } for i, page in enumerate(pages) ], } return json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8") def _pdf_safe_text(value: str) -> str: """Normalize common Unicode punctuation for Latin-1 PDF output.""" replacements = { "\u2018": "'", "\u2019": "'", "\u201c": '"', "\u201d": '"', "\u2013": "-", "\u2014": "-", "\u2026": "...", "\u2022": "*", "\u00a0": " ", } out = str(value) for old, new in replacements.items(): out = out.replace(old, new) out = out.replace("\r\n", "\n").replace("\r", "\n") out = "".join(ch if (ch == "\n" or ch == "\t" or ord(ch) >= 32) else " " for ch in out) out = out.replace("\t", " ") return out.encode("latin-1", errors="replace").decode("latin-1") def _pdf_safe_wrapped_lines(value: str, width: int = 110) -> list[str]: """Pre-wrap text for predictable PDF rendering.""" safe = _pdf_safe_text(value) out: list[str] = [] for paragraph in safe.split("\n"): if not paragraph.strip(): out.append("") continue wrapped = textwrap.wrap( paragraph, width=width, break_long_words=True, break_on_hyphens=False, replace_whitespace=False, drop_whitespace=False, ) out.extend(wrapped if wrapped else [""]) return out def _reportlab_canvas() -> tuple[Any, io.BytesIO, float, float]: from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas buffer = io.BytesIO() c = canvas.Canvas(buffer, pagesize=letter) page_width, page_height = letter return c, buffer, page_width, page_height @st.cache_data(show_spinner=False, ttl=3600) def build_text_pdf_bytes( title: str, pages_hash: str, pages_tuple: tuple[tuple[int, str], ...] ) -> bytes: _ = pages_hash c, buffer, _page_width, page_height = _reportlab_canvas() left_margin = 54 top_margin = 54 bottom_margin = 54 line_height = 12 y = page_height - top_margin def ensure_space(lines_needed: int = 1) -> None: nonlocal y needed = lines_needed * line_height if y - needed < bottom_margin: c.showPage() y = page_height - top_margin c.setFont("Helvetica-Bold", 14) for line in _pdf_safe_wrapped_lines(title or "Document", width=92): ensure_space(1) c.drawString(left_margin, y, line if line else " ") y -= line_height y -= 6 if not pages_tuple: c.setFont("Helvetica", 10) ensure_space(1) c.drawString(left_margin, y, "No extracted page text available.") else: for page_number, page_text in pages_tuple: c.setFont("Helvetica-Bold", 11) ensure_space(1) c.drawString(left_margin, y, _pdf_safe_text(f"Page {page_number}")) y -= line_height c.setFont("Helvetica", 10) for line in _pdf_safe_wrapped_lines(page_text or "(no text)", width=110): ensure_space(1) c.drawString(left_margin, y, line if line else " ") y -= line_height y -= 6 c.save() return buffer.getvalue() @st.cache_data(show_spinner=False, ttl=3600) def build_pdf_unavailable_stub_bytes(title: str, reason: str) -> bytes: """Last-resort minimal PDF so the user still gets a valid PDF file.""" c, buffer, _page_width, page_height = _reportlab_canvas() left_margin = 54 top_margin = 54 line_height = 12 y = page_height - top_margin c.setFont("Helvetica-Bold", 14) c.drawString(left_margin, y, _pdf_safe_text(title or "Document")) y -= (line_height + 4) c.setFont("Helvetica", 10) c.drawString( left_margin, y, "PDF content could not be fully rendered from extracted text.", ) y -= line_height c.drawString(left_margin, y, _pdf_safe_text(f"Reason: {reason}")) c.save() return buffer.getvalue() @st.cache_data(show_spinner=False, ttl=3600) def fetch_source_pdf_bytes(url: str) -> bytes | None: try: req = urllib.request.Request(url, headers={"User-Agent": "NEPA.tools/1.0"}) with urllib.request.urlopen(req, timeout=12) as response: data = response.read() content_type = str(response.headers.get("Content-Type", "")).lower() if not data: return None if "pdf" in content_type or data.startswith(b"%PDF"): return data except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, ValueError): return None except Exception: return None return None def resolve_pdf_bytes( title: str, doc: dict[str, Any], pages: list[dict[str, Any]] ) -> tuple[bytes, bool]: source_url = _doc_source_pdf_url(doc) if source_url: source_pdf_bytes = fetch_source_pdf_bytes(source_url) if source_pdf_bytes: return source_pdf_bytes, True pages_tuple, pages_hash = _pages_tuple_and_hash(pages) try: generated_pdf = build_text_pdf_bytes(title, pages_hash, pages_tuple) return generated_pdf, False except Exception as exc: reason = f"{type(exc).__name__}" stub_pdf = build_pdf_unavailable_stub_bytes(title, reason) return stub_pdf, False @st.cache_data(show_spinner=False) def load_header_logo_svg() -> str: logo_path = os.path.join(os.path.dirname(__file__), "assets", "nepa_logo_bw.svg") try: with open(logo_path, "r", encoding="utf-8") as f: svg = f.read() except OSError: return "" # Remove XML declaration for HTML embedding in markdown. if svg.lstrip().startswith("", 1)[-1].lstrip() # Crop this specific asset to the horizontal lockup so it renders at header scale. if 'viewBox="0 0 1024 1024"' in svg: svg = svg.replace( 'viewBox="0 0 1024 1024"', 'viewBox="140 420 745 190" role="img" aria-label="NEPA.tools logo"', ) # Keep colors theme-aware; first black path is the frame/border. svg = svg.replace('fill="#000000"', 'fill="var(--nepa-logo-border)"', 1) svg = svg.replace('fill="#000000"', 'fill="var(--nepa-logo-ink)"') svg = svg.replace('stroke="#000000"', 'stroke="var(--nepa-logo-stroke)"') return svg def render_header() -> None: logo_svg = load_header_logo_svg() if logo_svg: st.markdown( f"""
""", unsafe_allow_html=True, ) else: st.markdown('

NEPA.tools

', unsafe_allow_html=True) st.markdown( '

' "Analyze federal NEPA precedents faster: filter, shortlist, compare, and review full text " "across 120,000+ documents from 60+ agencies." "

", unsafe_allow_html=True, ) st.markdown( '
' '📊 Data from ' "NEPATEC 2.0 by Pacific Northwest National Laboratory · CC0 Public Domain · DOE Office of Policy" "
", unsafe_allow_html=True, ) def render_intro_section() -> None: st.markdown( """

Quick Start

NEPA.tools helps you identify comparable federal NEPA precedents across CE, EA, and EIS records. Select a category, load a manageable subset, then refine, review, and export results.

The app starts with 500 records by default to stay responsive. Increase data scope only when you need broader coverage.

Application Workflow

  1. Choose a document category (CE, EA, or EIS).
  2. Set Max records to load (default 500).
  3. Click Load records to fetch projects for that category.
  4. Refine results with query controls:
    • Keyword Search: search project title, description, location, and lead agency fields.
    • Lead Agency: filter to one or more federal lead agencies.
    • Location: filter by project geography and jurisdiction metadata.
    • Project Sector: filter by sector domain (for example transportation, energy, or land management).
    • Project Type: filter by project type classification.
    • Filter Preset: apply a predefined filter bundle; use Clear filters to reset and Save current filter to store a reusable profile.
  5. Review KPIs and select rows in Discover to build a shortlist.
  6. Use Compare for side-by-side shortlisted projects.
  7. Use Read to load document text, browse pages, and download TXT, PDF, JSON, or Markdown files.
  8. Use Export to download CSV metadata for filtered results or shortlist-only scope.

Inputs and Outputs

""", unsafe_allow_html=True, ) with st.expander("Usage Notes", expanded=True): st.markdown( """ - **Best for:** environmental planners, permitting teams, legal/policy analysts. - **Performance tip:** keep `Max records to load` at `500` for fast iteration, then move to `1000+` when you need broader coverage. - **Data source:** [PNNL/NEPATEC2.0](https://huggingface.co/datasets/PNNL/NEPATEC2.0), CC0 public domain. """ ) def render_query_rail(options: dict[str, list[str]], data_loaded: bool) -> None: st.markdown('
', unsafe_allow_html=True) top_left, top_right = st.columns([1.7, 1.0], vertical_alignment="bottom") with top_left: category_options = list(CATEGORIES.keys()) current_idx = ( category_options.index(st.session_state.selected_cat) if st.session_state.selected_cat in category_options else None ) selected_cat = st.selectbox( "Document category", options=category_options, index=current_idx, format_func=lambda x: f"{x} — {CATEGORIES[x]}", placeholder="Select CE, EA, or EIS to enable loading", ) if selected_cat != st.session_state.selected_cat: st.session_state.selected_cat = selected_cat clear_filter_state() clear_selection_state() clear_loaded_data_state() st.rerun() with top_right: st.markdown('
', unsafe_allow_html=True) can_load = st.session_state.selected_cat in CATEGORIES button_label = "Load records" if data_loaded and st.session_state.loaded_cat == st.session_state.selected_cat: button_label = "Reload records" if st.button( button_label, key="load_category_btn", use_container_width=True, disabled=not can_load, type="primary", ): st.session_state.load_requested = True clear_selection_state() st.rerun() if st.session_state.selected_cat: desc = CATEGORY_DESCRIPTIONS.get(st.session_state.selected_cat, "") if data_loaded and st.session_state.loaded_cat == st.session_state.selected_cat: desc = f"{desc} · Loaded: {st.session_state.loaded_cat}" elif st.session_state.loaded_cat != st.session_state.selected_cat: desc = f"{desc} · Ready to load {st.session_state.selected_cat}" st.markdown(f'
{desc}
', unsafe_allow_html=True) else: st.markdown( '
Choose a category first, then click Load records.
', unsafe_allow_html=True, ) scope_col, reset_col = st.columns([1.8, 0.95], vertical_alignment="bottom") with scope_col: st.select_slider( "Max records to load", options=MAX_LOAD_OPTIONS, key="max_load", help="Default is 500 for speed. Increase only when you need wider coverage.", ) st.caption("Default is 500 records for fast, responsive loading.") with reset_col: st.markdown('
', unsafe_allow_html=True) if st.button("Reset session", key="reset_session", use_container_width=True): reset_session_state() st.rerun() pinned = len(shortlist_for_current_category()) selected_cat_label = st.session_state.selected_cat or "No category selected" st.markdown( f'
Shortlisted in {selected_cat_label}: {pinned}
', unsafe_allow_html=True, ) if not data_loaded: st.info("No data loaded yet. Select a category and click `Load records` to begin.") st.markdown("
", unsafe_allow_html=True) return st.text_input( "Keyword search", key="search_text", placeholder="solar, transmission line, grazing permit...", ) f1, f2 = st.columns(2, vertical_alignment="top") with f1: st.multiselect("Lead Agency", options=options["selected_agencies"], key="selected_agencies") with f2: st.multiselect("Location", options=options["selected_locations"], key="selected_locations") f3, f4 = st.columns(2, vertical_alignment="top") with f3: st.multiselect("Project Sector", options=options["selected_sectors"], key="selected_sectors") with f4: st.multiselect("Project Type", options=options["selected_types"], key="selected_types") a1, a2, a3 = st.columns([1.05, 1.35, 1.05], vertical_alignment="bottom") with a1: st.markdown('
', unsafe_allow_html=True) if st.button("Clear filters", key="clear_filters_btn", use_container_width=True): clear_filter_state() st.rerun() with a2: st.selectbox("Filter preset", options=list(FILTER_PRESETS.keys()), key="filter_preset") if st.session_state.filter_preset != st.session_state.last_applied_preset: apply_filter_preset(st.session_state.filter_preset, options) st.session_state.last_applied_preset = st.session_state.filter_preset st.rerun() with a3: st.markdown('
', unsafe_allow_html=True) if st.button("Save current filter", key="save_recent_filter", use_container_width=True): save_current_filters_as_recent() st.toast("Saved to recent filters") loaded_df = get_loaded_df() matching = len(apply_filters(loaded_df)) st.markdown( f'
{matching:,} matching projects out of {len(loaded_df):,} loaded
', unsafe_allow_html=True, ) if st.session_state.recent_filters: st.caption("Saved recent filters") recent_cols = st.columns(len(st.session_state.recent_filters)) for i, recent in enumerate(st.session_state.recent_filters): with recent_cols[i]: if st.button(recent["label"], key=f"recent_filter_{recent['id']}", use_container_width=True): apply_recent_filter(recent, options) st.rerun() st.markdown("", unsafe_allow_html=True) def render_kpi_row(filtered: pd.DataFrame, loaded_df: pd.DataFrame) -> None: n_projects = len(filtered) n_docs = int(filtered["doc_count"].sum()) if not filtered.empty else 0 n_pages = int(filtered["total_pages"].sum()) if not filtered.empty else 0 n_agencies = int(filtered["lead_agency"].nunique()) if not filtered.empty else 0 n_shortlisted = len(shortlist_for_current_category()) labels = [ (n_projects, "Projects"), (n_docs, "Documents"), (n_pages, "Pages"), (n_agencies, "Agencies"), (n_shortlisted, "Shortlisted"), ] top_row = st.columns(3) for col, (value, label) in zip(top_row, labels[:3]): with col: st.markdown( f"""
{value:,}
{label}
""", unsafe_allow_html=True, ) bottom_row = st.columns(2) for col, (value, label) in zip(bottom_row, labels[3:]): with col: st.markdown( f"""
{value:,}
{label}
""", unsafe_allow_html=True, ) st.markdown( '

Select a row below to inspect project metadata and manage your shortlist.

', unsafe_allow_html=True, ) st.caption(f"Data scope: {len(loaded_df):,} loaded records") def render_selected_project_panel(filtered: pd.DataFrame) -> None: row = selected_row_in_filtered(filtered) if row is None: st.markdown( """
Select a project in Discover to preview metadata here.
""", unsafe_allow_html=True, ) return tag_class = f"tag-{str(st.session_state.selected_cat).lower()}" desc = str(row.get("project_description", "")).strip() if len(desc) > 280: desc = desc[:280].rstrip() + "..." st.markdown( f"""
{st.session_state.selected_cat}

{row['project_title']}

Agency: {row['lead_agency']}
Location: {row['location']}
Sector: {row['project_sector']}
Type: {row['project_type']}
Docs/Pages: {row['doc_count']} / {int(row['total_pages']):,}
{desc if desc and desc != 'None' else 'No project description available.'}
""", unsafe_allow_html=True, ) def render_discover_tab(filtered: pd.DataFrame) -> None: left_col, right_col = st.columns([2.2, 1.0], gap="large") with left_col: if filtered.empty: st.info("No projects match the current filters. Broaden your search or clear filters.") else: display_cols = { "_category": "Type", "project_title": "Project Title", "lead_agency": "Lead Agency", "location": "Location", "project_sector": "Sector", "project_type": "Project Type", "doc_count": "Docs", "total_pages": "Pages", } display_df = filtered[list(display_cols.keys())].rename(columns=display_cols).reset_index(drop=True) row_to_record_idx = filtered["_record_idx"].astype(int).tolist() table_height = 560 event = st.dataframe( display_df, use_container_width=True, hide_index=True, height=table_height, column_config={ "Type": st.column_config.TextColumn(width="small"), "Project Title": st.column_config.TextColumn(width="large"), "Docs": st.column_config.NumberColumn(width="small"), "Pages": st.column_config.NumberColumn(width="small"), }, on_select="rerun", selection_mode="single-row", ) selected_rows = parse_selected_rows(event) if selected_rows: pos = selected_rows[0] if 0 <= pos < len(row_to_record_idx): st.session_state.selected_record_idx = int(row_to_record_idx[pos]) st.caption(f"Showing {len(filtered):,} of {len(get_loaded_df()):,} loaded records") selected_idx = st.session_state.selected_record_idx shortlist_ids = shortlist_for_current_category() can_use_selection = selected_row_in_filtered(filtered) is not None a1, a2, a3 = st.columns(3) with a1: if st.button( "Add to shortlist", key="add_shortlist_btn", use_container_width=True, disabled=not can_use_selection, ): add_selected_to_shortlist() st.rerun() with a2: can_remove = selected_idx is not None and selected_idx in shortlist_ids if st.button( "Remove from shortlist", key="remove_shortlist_btn", use_container_width=True, disabled=not can_remove, ): remove_selected_from_shortlist() st.rerun() with a3: if st.button( "View in Read", key="open_in_read_btn", use_container_width=True, disabled=not can_use_selection, ): st.session_state.read_focus_hint = True st.info("Open the Read tab to inspect the selected project.") with right_col: render_selected_project_panel(filtered) def render_compare_tab(filtered: pd.DataFrame) -> None: _ = filtered sdf = shortlist_df() if sdf.empty: st.info("No shortlisted projects yet. Add projects in the Discover tab.") return st.markdown(f"#### Comparing {len(sdf):,} shortlisted projects") compact_cols = { "project_title": "Project Title", "lead_agency": "Agency", "location": "Location", "project_sector": "Sector", "doc_count": "Docs", "total_pages": "Pages", } st.dataframe( sdf[list(compact_cols.keys())].rename(columns=compact_cols), use_container_width=True, hide_index=True, height=300, ) common = compute_common_attributes(sdf) if common: chips = "".join( [f'{label}: {value}' for label, value in common.items()] ) st.markdown("**Common attributes**") st.markdown(chips, unsafe_allow_html=True) remove_options = [f"{row['_record_idx']} · {row['project_title'][:70]}" for _, row in sdf.iterrows()] r1, r2 = st.columns([2.4, 1.0]) with r1: to_remove = st.selectbox("Remove a project from shortlist", options=remove_options, key="remove_one_compare") if st.button("Remove selected", key="remove_selected_compare"): rec_idx = int(str(to_remove).split(" · ", 1)[0]) if rec_idx in st.session_state.shortlist_by_cat[st.session_state.selected_cat]: st.session_state.shortlist_by_cat[st.session_state.selected_cat].remove(rec_idx) if st.session_state.selected_record_idx == rec_idx: st.session_state.selected_record_idx = None st.rerun() with r2: if st.button("Clear shortlist", key="clear_shortlist_compare", use_container_width=True): st.session_state.shortlist_by_cat[st.session_state.selected_cat] = [] st.session_state.selected_record_idx = None st.rerun() st.markdown("#### Side-by-side snapshot") top_n = min(4, len(sdf)) card_cols = st.columns(top_n) for i, (_, row) in enumerate(sdf.head(top_n).iterrows()): with card_cols[i]: st.markdown( f"""
{row['_category']}
{row['project_title']}
{row['lead_agency']}
{row['location']}
{row['project_sector']} · {row['project_type']}
{int(row['doc_count'])} docs · {int(row['total_pages']):,} pages
""", unsafe_allow_html=True, ) if st.button("Set active in Read", key=f"use_in_read_{int(row['_record_idx'])}", use_container_width=True): st.session_state.selected_record_idx = int(row["_record_idx"]) st.session_state.read_focus_hint = True st.rerun() def _doc_value(obj: dict, key: str) -> str: value = obj.get(key, {}) if isinstance(value, dict): value = value.get("value", "") if isinstance(value, list): return ", ".join(str(item) for item in value) if value else "" return str(value) if value else "" def render_read_tab(filtered: pd.DataFrame) -> None: _ = filtered loaded_df = get_loaded_df() if loaded_df.empty: st.info("No records are loaded.") return active_idx = st.session_state.selected_record_idx if get_row_by_record_idx(loaded_df, active_idx) is None: active_idx = None if active_idx is None: shortlist_ids = shortlist_for_current_category() for sid in shortlist_ids: if get_row_by_record_idx(loaded_df, sid) is not None: active_idx = sid st.session_state.selected_record_idx = sid break if active_idx is None: st.info("Select a project in Discover or add one to the shortlist to start reading documents.") return active_row = get_row_by_record_idx(loaded_df, active_idx) if active_row is None: st.info("The active project is not available in the current data scope.") return if st.session_state.read_focus_hint: st.caption("Read tab is now focused on the selected project.") st.session_state.read_focus_hint = False st.markdown( f"""
ACTIVE PROJECT · {st.session_state.selected_cat}

{active_row['project_title']}

Agency: {active_row['lead_agency']}
Location: {active_row['location']}
Sector: {active_row['project_sector']}
Type: {active_row['project_type']}
Docs/Pages: {int(active_row['doc_count'])} docs · {int(active_row['total_pages']):,} pages
""", unsafe_allow_html=True, ) desc = str(active_row.get("project_description", "")) if desc and desc != "None": with st.expander("Project Description"): st.write(desc) if st.button("Load project documents", key="load_docs_read_tab", use_container_width=True): with st.spinner("Loading full project record from Hugging Face..."): record = load_single_record(st.session_state.selected_cat, int(active_idx)) if record: st.session_state.loaded_record_idx = int(active_idx) st.session_state.loaded_record_data = record else: st.error("Could not load this project record.") if ( st.session_state.loaded_record_idx != int(active_idx) or st.session_state.loaded_record_data is None ): st.caption("Load project documents to browse page text.") return record = st.session_state.loaded_record_data documents = record.get("documents", []) if not documents: st.info("No document files are available for this project record.") return st.text_input( "Document title filter", key="read_doc_search", placeholder="Search document titles within this project...", ) doc_query = st.session_state.read_doc_search.strip().lower() rendered_count = 0 for d_idx, doc in enumerate(documents): meta = doc.get("metadata", {}) file_meta = meta.get("file_metadata", {}) doc_meta = meta.get("document_metadata", {}) title = ( _doc_value(doc_meta, "document_title") or _doc_value(file_meta, "file_name") or f"Document {d_idx + 1}" ) if doc_query and doc_query not in title.lower(): continue pages = _document_pages(doc) n_pages = len(pages) rendered_count += 1 with st.expander(f"{title} ({n_pages} pages)"): if pages: page_nums = [p.get("page number", i + 1) for i, p in enumerate(pages)] if len(page_nums) > 1: selected_page = st.select_slider( "Page number", options=page_nums, value=page_nums[0], key=f"page_slider_{active_idx}_{d_idx}", ) else: selected_page = page_nums[0] page_data = next( (p for p in pages if p.get("page number") == selected_page), pages[0], ) st.text_area( f"Page {selected_page}", value=page_data.get("page text", "(no text)"), height=450, key=f"page_text_{active_idx}_{d_idx}_{selected_page}", ) else: st.caption("No page text found.") file_name = sanitize_filename(title) txt_bytes = build_txt_bytes(title, pages) md_bytes = build_md_bytes(title, pages) json_bytes = build_json_bytes(title, doc, pages) pdf_bytes: bytes | None = None pdf_file_name = f"{file_name}.pdf" pdf_source_note = "" pdf_error_note = "" try: resolved_pdf, used_source_pdf = resolve_pdf_bytes(title, doc, pages) if resolved_pdf: pdf_bytes = resolved_pdf if used_source_pdf: pdf_file_name = f"{file_name}_source.pdf" pdf_source_note = "PDF source: original file." else: pdf_source_note = "PDF source: generated from extracted page text." except Exception as exc: pdf_bytes = None pdf_error_note = f"PDF unavailable for this document ({type(exc).__name__})." d1, d2, d3, d4 = st.columns(4) with d1: st.download_button( "Download TXT", data=txt_bytes, file_name=f"{file_name}.txt", mime="text/plain", key=f"download_doc_txt_{active_idx}_{d_idx}", use_container_width=True, ) with d2: if pdf_bytes is not None: st.download_button( "Download PDF", data=pdf_bytes, file_name=pdf_file_name, mime="application/pdf", key=f"download_doc_pdf_{active_idx}_{d_idx}", use_container_width=True, ) else: st.download_button( "Download PDF", data=b"", file_name=f"{file_name}.pdf", mime="application/pdf", key=f"download_doc_pdf_{active_idx}_{d_idx}", use_container_width=True, disabled=True, ) with d3: st.download_button( "Download JSON", data=json_bytes, file_name=f"{file_name}.json", mime="application/json", key=f"download_doc_json_{active_idx}_{d_idx}", use_container_width=True, ) with d4: st.download_button( "Download Markdown", data=md_bytes, file_name=f"{file_name}.md", mime="text/markdown", key=f"download_doc_md_{active_idx}_{d_idx}", use_container_width=True, ) if pdf_bytes is not None: st.caption(pdf_source_note) else: st.caption(pdf_error_note or "PDF unavailable for this document.") if rendered_count == 0: st.info("No documents match the current title filter.") def render_export_tab(filtered: pd.DataFrame) -> None: options = [ "Filtered results (active query)", "Shortlist only (active category)", ] scope = st.radio("Export dataset", options=options, horizontal=True, key="export_scope") if scope == "Filtered results (active query)": export_df = filtered scope_slug = "filtered" else: export_df = shortlist_df() scope_slug = "shortlist" if export_df.empty: st.info("No projects are available for the selected export dataset.") return st.markdown(f"#### Ready to export {len(export_df):,} projects") col_l, col_r = st.columns([1.5, 1.0]) with col_l: st.markdown( """
Metadata CSV
Includes project title, agency, location, sector, project type, sponsor, and document/page counts.
""", unsafe_allow_html=True, ) st.download_button( "Download metadata (CSV)", data=export_metadata_csv(export_df), file_name=f"nepatec_{st.session_state.selected_cat}_{scope_slug}_{datetime.now().strftime('%Y%m%d')}.csv", mime="text/csv", key="download_metadata_csv", use_container_width=True, ) with col_r: st.markdown( """
Next-step tips
1. Export your shortlist or filtered set.
2. Load the CSV into your analytics workflow.
3. Analyze precedent patterns by agency, sector, and location.
""", unsafe_allow_html=True, ) def render_about_tab() -> None: st.markdown( """ ### About & Contact **NEPA.tools** is a public explorer for the NEPATEC 2.0 dataset. Built for environmental planners, permitting teams, and legal/policy researchers. ### Dataset Snapshot - **120,000+ documents** from **60,000+ projects** across **60+ federal agencies** - Covers **50+ years** of federal environmental review records - Includes CE, EA, and EIS categories with standardized metadata and page text ### Source & License - Dataset: [PNNL/NEPATEC2.0 on Hugging Face](https://huggingface.co/datasets/PNNL/NEPATEC2.0) - License: [CC0 — Public Domain](https://creativecommons.org/publicdomain/zero/1.0/) - Dataset contact: permitai@pnnl.gov ### Creator - Nader Khalil - Email: [nader@ceqa.ai](mailto:nader@ceqa.ai) - Hugging Face: [@CODEAVELi](https://huggingface.co/CODEAVELi) - GitHub: [@CODEAVELi](https://github.com/CODEAVELi) - LinkedIn: [/in/naderkhalil](https://www.linkedin.com/in/naderkhalil) --- *NEPA.tools is not affiliated with PNNL, DOE, or the federal government.* """ ) def render_footer() -> None: st.markdown( """
Built by Nader Khalil
""", unsafe_allow_html=True, ) st.markdown( '", unsafe_allow_html=True, ) # -------------------------------------------------- # MAIN APP # -------------------------------------------------- def main() -> None: init_session_state() read_query_params_into_state() apply_theme_css() render_header() render_intro_section() filter_options = { "selected_agencies": [], "selected_locations": [], "selected_sectors": [], "selected_types": [], } df = pd.DataFrame() data_loaded = False should_load = bool( st.session_state.load_requested and st.session_state.selected_cat in CATEGORIES ) if should_load: with st.spinner(f"Loading {st.session_state.selected_cat}..."): df = load_category_index( st.session_state.selected_cat, max_records=int(st.session_state.max_load), ) if df.empty: clear_loaded_data_state() render_query_rail(filter_options, data_loaded=False) st.error("Could not load category data.") st.markdown( """ **Setup required:** 1. Accept terms at [NEPATEC 2.0](https://huggingface.co/datasets/PNNL/NEPATEC2.0) 2. Set `HF_TOKEN` as a Space secret """ ) write_state_to_query_params() render_footer() return st.session_state._loaded_df = df st.session_state.loaded_cat = st.session_state.selected_cat data_loaded = True available_ids = set(df["_record_idx"].astype(int).tolist()) current_shortlist = st.session_state.shortlist_by_cat[st.session_state.selected_cat] st.session_state.shortlist_by_cat[st.session_state.selected_cat] = [ idx for idx in current_shortlist if int(idx) in available_ids ] filter_options = build_filter_options(df) sanitize_filter_state(filter_options) else: st.session_state._loaded_df = pd.DataFrame() render_query_rail(filter_options, data_loaded=data_loaded) if not data_loaded: if st.session_state.selected_cat: st.info( f"Category selected: **{st.session_state.selected_cat}**. " "Click `Load records` to fetch projects." ) write_state_to_query_params() render_footer() return filtered = apply_filters(df) if selected_row_in_filtered(filtered) is None: st.session_state.selected_record_idx = None render_kpi_row(filtered, df) st.markdown('
', unsafe_allow_html=True) tab_discover, tab_compare, tab_read, tab_export, tab_about = st.tabs( ["Discover", "Compare", "Read", "Export", "About & Contact"] ) st.markdown("
", unsafe_allow_html=True) with tab_discover: render_discover_tab(filtered) with tab_compare: render_compare_tab(filtered) with tab_read: render_read_tab(filtered) with tab_export: render_export_tab(filtered) with tab_about: render_about_tab() write_state_to_query_params() render_footer() if __name__ == "__main__": main()