Spaces:
Sleeping
Sleeping
| """Database layer — SQLite schema, connection, and query helpers.""" | |
| import json | |
| import logging | |
| import sqlite3 | |
| from contextlib import contextmanager | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| log = logging.getLogger(__name__) | |
| def get_db_path() -> Path: | |
| from src.config import DB_PATH | |
| return DB_PATH | |
| def get_conn(): | |
| """Yield a SQLite connection with WAL mode and foreign keys.""" | |
| path = get_db_path() | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| conn = sqlite3.connect(str(path)) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| conn.execute("PRAGMA foreign_keys=ON") | |
| try: | |
| yield conn | |
| conn.commit() | |
| except Exception: | |
| conn.rollback() | |
| log.exception("Database transaction failed") | |
| raise | |
| finally: | |
| conn.close() | |
| def init_db(): | |
| """Create tables if they don't exist.""" | |
| with get_conn() as conn: | |
| conn.executescript(SCHEMA) | |
| for sql in _MIGRATIONS: | |
| try: | |
| conn.execute(sql) | |
| except sqlite3.OperationalError as e: | |
| if "duplicate column" in str(e).lower() or "already exists" in str(e).lower(): | |
| pass # Expected — column/index already exists | |
| else: | |
| log.warning("Migration failed: %s — %s", sql.strip()[:60], e) | |
| # Rebuild FTS index from content table (idempotent, fast for a few thousand rows) | |
| conn.execute("INSERT INTO papers_fts(papers_fts) VALUES('rebuild')") | |
| SCHEMA = """\ | |
| CREATE TABLE IF NOT EXISTS runs ( | |
| id INTEGER PRIMARY KEY, | |
| domain TEXT NOT NULL, | |
| started_at TEXT NOT NULL, | |
| finished_at TEXT, | |
| date_start TEXT NOT NULL, | |
| date_end TEXT NOT NULL, | |
| paper_count INTEGER DEFAULT 0, | |
| status TEXT DEFAULT 'running' | |
| ); | |
| CREATE TABLE IF NOT EXISTS papers ( | |
| id INTEGER PRIMARY KEY, | |
| run_id INTEGER REFERENCES runs(id), | |
| domain TEXT NOT NULL, | |
| arxiv_id TEXT NOT NULL, | |
| entry_id TEXT, | |
| title TEXT NOT NULL, | |
| authors TEXT, | |
| abstract TEXT, | |
| published TEXT, | |
| categories TEXT, | |
| pdf_url TEXT, | |
| arxiv_url TEXT, | |
| comment TEXT, | |
| source TEXT, | |
| github_repo TEXT, | |
| github_stars INTEGER, | |
| hf_upvotes INTEGER DEFAULT 0, | |
| hf_models TEXT, | |
| hf_datasets TEXT, | |
| hf_spaces TEXT, | |
| score_axis_1 REAL, | |
| score_axis_2 REAL, | |
| score_axis_3 REAL, | |
| composite REAL, | |
| summary TEXT, | |
| reasoning TEXT, | |
| code_url TEXT, | |
| UNIQUE(domain, arxiv_id, run_id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS events ( | |
| id INTEGER PRIMARY KEY, | |
| run_id INTEGER, | |
| category TEXT NOT NULL, | |
| title TEXT NOT NULL, | |
| description TEXT, | |
| url TEXT, | |
| event_date TEXT, | |
| source TEXT, | |
| relevance_score REAL, | |
| fetched_at TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS paper_connections ( | |
| id INTEGER PRIMARY KEY, | |
| paper_id INTEGER NOT NULL REFERENCES papers(id), | |
| connected_arxiv_id TEXT, | |
| connected_s2_id TEXT, | |
| connected_title TEXT, | |
| connected_year INTEGER, | |
| connection_type TEXT NOT NULL, | |
| in_db_paper_id INTEGER, | |
| fetched_at TEXT NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_papers_domain_composite | |
| ON papers(domain, composite DESC); | |
| CREATE INDEX IF NOT EXISTS idx_papers_run ON papers(run_id); | |
| CREATE INDEX IF NOT EXISTS idx_events_category ON events(category, event_date); | |
| CREATE INDEX IF NOT EXISTS idx_connections_paper ON paper_connections(paper_id); | |
| CREATE INDEX IF NOT EXISTS idx_connections_arxiv ON paper_connections(connected_arxiv_id); | |
| CREATE INDEX IF NOT EXISTS idx_papers_arxiv_id ON papers(arxiv_id); | |
| CREATE INDEX IF NOT EXISTS idx_papers_published ON papers(published); | |
| CREATE INDEX IF NOT EXISTS idx_events_run_id ON events(run_id); | |
| CREATE TABLE IF NOT EXISTS github_projects ( | |
| id INTEGER PRIMARY KEY, | |
| run_id INTEGER REFERENCES runs(id), | |
| repo_id INTEGER NOT NULL, | |
| repo_name TEXT NOT NULL, | |
| description TEXT, | |
| language TEXT, | |
| stars INTEGER DEFAULT 0, | |
| forks INTEGER DEFAULT 0, | |
| pull_requests INTEGER DEFAULT 0, | |
| total_score REAL DEFAULT 0, | |
| collection_names TEXT, | |
| topics TEXT DEFAULT '[]', | |
| url TEXT NOT NULL, | |
| domain TEXT, | |
| fetched_at TEXT NOT NULL, | |
| UNIQUE(repo_name, run_id) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_gh_run ON github_projects(run_id); | |
| CREATE INDEX IF NOT EXISTS idx_gh_domain ON github_projects(domain, total_score DESC); | |
| CREATE INDEX IF NOT EXISTS idx_gh_repo ON github_projects(repo_name); | |
| CREATE TABLE IF NOT EXISTS user_signals ( | |
| id INTEGER PRIMARY KEY, | |
| paper_id INTEGER NOT NULL REFERENCES papers(id), | |
| action TEXT NOT NULL CHECK(action IN ('save','view','upvote','downvote','dismiss')), | |
| created_at TEXT NOT NULL, | |
| metadata TEXT DEFAULT '{}' | |
| ); | |
| CREATE UNIQUE INDEX IF NOT EXISTS idx_signals_paper_action | |
| ON user_signals(paper_id, action) WHERE action != 'view'; | |
| CREATE INDEX IF NOT EXISTS idx_signals_created ON user_signals(created_at); | |
| CREATE INDEX IF NOT EXISTS idx_signals_paper ON user_signals(paper_id); | |
| CREATE TABLE IF NOT EXISTS user_preferences ( | |
| id INTEGER PRIMARY KEY, | |
| pref_key TEXT NOT NULL UNIQUE, | |
| pref_value REAL NOT NULL DEFAULT 0.0, | |
| signal_count INTEGER NOT NULL DEFAULT 0, | |
| updated_at TEXT NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_prefs_key ON user_preferences(pref_key); | |
| CREATE VIRTUAL TABLE IF NOT EXISTS papers_fts USING fts5( | |
| title, abstract, summary, topics, | |
| content='papers', content_rowid='id', | |
| tokenize='porter unicode61' | |
| ); | |
| CREATE TRIGGER IF NOT EXISTS papers_ai AFTER INSERT ON papers BEGIN | |
| INSERT INTO papers_fts(rowid, title, abstract, summary, topics) | |
| VALUES (new.id, new.title, new.abstract, new.summary, new.topics); | |
| END; | |
| CREATE TRIGGER IF NOT EXISTS papers_ad AFTER DELETE ON papers BEGIN | |
| INSERT INTO papers_fts(papers_fts, rowid, title, abstract, summary, topics) | |
| VALUES ('delete', old.id, old.title, old.abstract, old.summary, old.topics); | |
| END; | |
| CREATE TRIGGER IF NOT EXISTS papers_au AFTER UPDATE ON papers BEGIN | |
| INSERT INTO papers_fts(papers_fts, rowid, title, abstract, summary, topics) | |
| VALUES ('delete', old.id, old.title, old.abstract, old.summary, old.topics); | |
| INSERT INTO papers_fts(rowid, title, abstract, summary, topics) | |
| VALUES (new.id, new.title, new.abstract, new.summary, new.topics); | |
| END; | |
| """ | |
| # Columns added after initial schema — idempotent via try/except | |
| _MIGRATIONS = [ | |
| "ALTER TABLE papers ADD COLUMN s2_tldr TEXT", | |
| "ALTER TABLE papers ADD COLUMN s2_paper_id TEXT", | |
| "ALTER TABLE papers ADD COLUMN topics TEXT DEFAULT '[]'", | |
| "CREATE UNIQUE INDEX IF NOT EXISTS idx_events_unique ON events(title, category)", | |
| # Prevent duplicate seed papers (NULL run_id) for the same arxiv_id+domain | |
| "CREATE UNIQUE INDEX IF NOT EXISTS idx_papers_seed_dedup ON papers(domain, arxiv_id) WHERE run_id IS NULL", | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Run helpers | |
| # --------------------------------------------------------------------------- | |
| def create_run(domain: str, date_start: str, date_end: str) -> int: | |
| """Insert a new pipeline run, return its ID.""" | |
| now = datetime.now(timezone.utc).isoformat() | |
| with get_conn() as conn: | |
| cur = conn.execute( | |
| "INSERT INTO runs (domain, started_at, date_start, date_end, status) " | |
| "VALUES (?, ?, ?, ?, 'running')", | |
| (domain, now, date_start, date_end), | |
| ) | |
| return cur.lastrowid | |
| def finish_run(run_id: int, paper_count: int, status: str = "completed"): | |
| now = datetime.now(timezone.utc).isoformat() | |
| with get_conn() as conn: | |
| conn.execute( | |
| "UPDATE runs SET finished_at=?, paper_count=?, status=? WHERE id=?", | |
| (now, paper_count, status, run_id), | |
| ) | |
| def get_latest_run(domain: str) -> dict | None: | |
| with get_conn() as conn: | |
| row = conn.execute( | |
| "SELECT * FROM runs WHERE domain=? ORDER BY id DESC LIMIT 1", | |
| (domain,), | |
| ).fetchone() | |
| return dict(row) if row else None | |
| def get_run(run_id: int) -> dict | None: | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone() | |
| return dict(row) if row else None | |
| # --------------------------------------------------------------------------- | |
| # Paper helpers | |
| # --------------------------------------------------------------------------- | |
| def _serialize_json(val): | |
| """JSON-encode lists/dicts for storage.""" | |
| if isinstance(val, (list, dict)): | |
| return json.dumps(val) | |
| return val | |
| def insert_papers(papers: list[dict], run_id: int, domain: str): | |
| """Bulk-insert papers into the DB.""" | |
| with get_conn() as conn: | |
| for p in papers: | |
| conn.execute( | |
| """INSERT OR IGNORE INTO papers | |
| (run_id, domain, arxiv_id, entry_id, title, authors, abstract, | |
| published, categories, pdf_url, arxiv_url, comment, source, | |
| github_repo, github_stars, hf_upvotes, hf_models, hf_datasets, hf_spaces) | |
| VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", | |
| ( | |
| run_id, domain, | |
| p.get("arxiv_id", ""), | |
| p.get("entry_id", ""), | |
| p.get("title", ""), | |
| _serialize_json(p.get("authors", [])), | |
| p.get("abstract", ""), | |
| p.get("published", ""), | |
| _serialize_json(p.get("categories", [])), | |
| p.get("pdf_url", ""), | |
| p.get("arxiv_url", ""), | |
| p.get("comment", ""), | |
| p.get("source", ""), | |
| p.get("github_repo", ""), | |
| p.get("github_stars"), | |
| p.get("hf_upvotes", 0), | |
| _serialize_json(p.get("hf_models", [])), | |
| _serialize_json(p.get("hf_datasets", [])), | |
| _serialize_json(p.get("hf_spaces", [])), | |
| ), | |
| ) | |
| def update_paper_scores(paper_id: int, scores: dict): | |
| """Update a paper's scores after Claude scoring.""" | |
| with get_conn() as conn: | |
| conn.execute( | |
| """UPDATE papers SET | |
| score_axis_1=?, score_axis_2=?, score_axis_3=?, | |
| composite=?, summary=?, reasoning=?, code_url=? | |
| WHERE id=?""", | |
| ( | |
| scores.get("score_axis_1"), | |
| scores.get("score_axis_2"), | |
| scores.get("score_axis_3"), | |
| scores.get("composite"), | |
| scores.get("summary", ""), | |
| scores.get("reasoning", ""), | |
| scores.get("code_url"), | |
| paper_id, | |
| ), | |
| ) | |
| def get_unscored_papers(run_id: int) -> list[dict]: | |
| """Get papers from a run that haven't been scored yet.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT * FROM papers WHERE run_id=? AND composite IS NULL", | |
| (run_id,), | |
| ).fetchall() | |
| return [_deserialize_paper(row) for row in rows] | |
| def get_top_papers(domain: str, run_id: int | None = None, limit: int = 20) -> list[dict]: | |
| """Get top-scored papers for a domain, optionally from a specific run.""" | |
| with get_conn() as conn: | |
| if run_id: | |
| rows = conn.execute( | |
| "SELECT * FROM papers WHERE domain=? AND run_id=? AND composite IS NOT NULL " | |
| "ORDER BY composite DESC LIMIT ?", | |
| (domain, run_id, limit), | |
| ).fetchall() | |
| else: | |
| # Latest run | |
| latest = get_latest_run(domain) | |
| if not latest: | |
| return [] | |
| rows = conn.execute( | |
| "SELECT * FROM papers WHERE domain=? AND run_id=? AND composite IS NOT NULL " | |
| "ORDER BY composite DESC LIMIT ?", | |
| (domain, latest["id"], limit), | |
| ).fetchall() | |
| return [_deserialize_paper(row) for row in rows] | |
| def get_paper(paper_id: int) -> dict | None: | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT * FROM papers WHERE id=?", (paper_id,)).fetchone() | |
| return _deserialize_paper(row) if row else None | |
| SORT_OPTIONS = { | |
| "score": "composite DESC", | |
| "date": "published DESC", | |
| "axis1": "score_axis_1 DESC", | |
| "axis2": "score_axis_2 DESC", | |
| "axis3": "score_axis_3 DESC", | |
| "title": "title ASC", | |
| } | |
| def get_papers_page(domain: str, run_id: int | None = None, | |
| offset: int = 0, limit: int = 50, | |
| min_score: float | None = None, | |
| has_code: bool | None = None, | |
| search: str | None = None, | |
| topic: str | None = None, | |
| sort: str | None = None) -> tuple[list[dict], int]: | |
| """Paginated, filterable paper list. Returns (papers, total_count).""" | |
| with get_conn() as conn: | |
| if not run_id: | |
| latest = get_latest_run(domain) | |
| if not latest: | |
| return [], 0 | |
| run_id = latest["id"] | |
| conditions = ["domain=?", "run_id=?", "composite IS NOT NULL"] | |
| params: list = [domain, run_id] | |
| if min_score is not None: | |
| conditions.append("composite >= ?") | |
| params.append(min_score) | |
| if has_code: | |
| conditions.append("(code_url IS NOT NULL AND code_url != '')") | |
| if search: | |
| conditions.append("(title LIKE ? OR abstract LIKE ?)") | |
| params.extend([f"%{search}%", f"%{search}%"]) | |
| if topic: | |
| conditions.append("topics LIKE ?") | |
| params.append(f'%"{topic}"%') | |
| where = " AND ".join(conditions) | |
| order = SORT_OPTIONS.get(sort, "composite DESC") | |
| total = conn.execute( | |
| f"SELECT COUNT(*) FROM papers WHERE {where}", params | |
| ).fetchone()[0] | |
| rows = conn.execute( | |
| f"SELECT * FROM papers WHERE {where} ORDER BY {order} LIMIT ? OFFSET ?", | |
| params + [limit, offset], | |
| ).fetchall() | |
| return [_deserialize_paper(row) for row in rows], total | |
| def count_papers(domain: str, run_id: int | None = None, scored_only: bool = False) -> int: | |
| with get_conn() as conn: | |
| if not run_id: | |
| latest = get_latest_run(domain) | |
| if not latest: | |
| return 0 | |
| run_id = latest["id"] | |
| sql = "SELECT COUNT(*) FROM papers WHERE domain=? AND run_id=?" | |
| if scored_only: | |
| sql += " AND composite IS NOT NULL" | |
| row = conn.execute(sql, (domain, run_id)).fetchone() | |
| return row[0] if row else 0 | |
| def _deserialize_paper(row) -> dict: | |
| """Convert a sqlite3.Row to a dict, parsing JSON fields.""" | |
| d = dict(row) | |
| for key in ("authors", "categories", "hf_models", "hf_datasets", "hf_spaces", "topics"): | |
| val = d.get(key) | |
| if isinstance(val, str): | |
| try: | |
| d[key] = json.loads(val) | |
| except (json.JSONDecodeError, TypeError): | |
| d[key] = [] | |
| return d | |
| # --------------------------------------------------------------------------- | |
| # Event helpers | |
| # --------------------------------------------------------------------------- | |
| def insert_events(events: list[dict], run_id: int | None = None): | |
| now = datetime.now(timezone.utc).isoformat() | |
| with get_conn() as conn: | |
| for e in events: | |
| conn.execute( | |
| """INSERT OR IGNORE INTO events | |
| (run_id, category, title, description, url, event_date, | |
| source, relevance_score, fetched_at) | |
| VALUES (?,?,?,?,?,?,?,?,?)""", | |
| ( | |
| run_id, | |
| e.get("category", ""), | |
| e.get("title", ""), | |
| e.get("description", ""), | |
| e.get("url", ""), | |
| e.get("event_date", ""), | |
| e.get("source", ""), | |
| e.get("relevance_score"), | |
| now, | |
| ), | |
| ) | |
| def get_events(category: str | None = None, limit: int = 50) -> list[dict]: | |
| with get_conn() as conn: | |
| if category: | |
| rows = conn.execute( | |
| "SELECT * FROM events WHERE category=? ORDER BY event_date DESC LIMIT ?", | |
| (category, limit), | |
| ).fetchall() | |
| else: | |
| rows = conn.execute( | |
| "SELECT * FROM events ORDER BY fetched_at DESC LIMIT ?", | |
| (limit,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| def count_events() -> int: | |
| with get_conn() as conn: | |
| return conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] | |
| # --------------------------------------------------------------------------- | |
| # Dashboard helpers | |
| # --------------------------------------------------------------------------- | |
| def get_all_runs(limit: int = 20) -> list[dict]: | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT * FROM runs ORDER BY id DESC LIMIT ?", (limit,) | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| # --------------------------------------------------------------------------- | |
| # Paper connections (Semantic Scholar) | |
| # --------------------------------------------------------------------------- | |
| def insert_connections(connections: list[dict]): | |
| """Bulk-insert paper connections.""" | |
| now = datetime.now(timezone.utc).isoformat() | |
| with get_conn() as conn: | |
| for c in connections: | |
| conn.execute( | |
| """INSERT INTO paper_connections | |
| (paper_id, connected_arxiv_id, connected_s2_id, | |
| connected_title, connected_year, connection_type, | |
| in_db_paper_id, fetched_at) | |
| VALUES (?,?,?,?,?,?,?,?)""", | |
| ( | |
| c["paper_id"], | |
| c.get("connected_arxiv_id", ""), | |
| c.get("connected_s2_id", ""), | |
| c.get("connected_title", ""), | |
| c.get("connected_year"), | |
| c["connection_type"], | |
| c.get("in_db_paper_id"), | |
| now, | |
| ), | |
| ) | |
| def get_paper_connections(paper_id: int) -> dict: | |
| """Get connected papers grouped by type.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT * FROM paper_connections WHERE paper_id=? " | |
| "ORDER BY connection_type, connected_year DESC", | |
| (paper_id,), | |
| ).fetchall() | |
| result = {"references": [], "recommendations": []} | |
| for row in rows: | |
| d = dict(row) | |
| ctype = d["connection_type"] | |
| if ctype in result: | |
| result[ctype].append(d) | |
| return result | |
| def clear_connections(paper_id: int): | |
| """Remove existing connections for a paper (before re-enrichment).""" | |
| with get_conn() as conn: | |
| conn.execute("DELETE FROM paper_connections WHERE paper_id=?", (paper_id,)) | |
| def update_paper_s2(paper_id: int, s2_paper_id: str, s2_tldr: str): | |
| """Update S2 metadata on a paper.""" | |
| with get_conn() as conn: | |
| conn.execute( | |
| "UPDATE papers SET s2_paper_id=?, s2_tldr=? WHERE id=?", | |
| (s2_paper_id, s2_tldr, paper_id), | |
| ) | |
| def update_paper_topics(paper_id: int, topics: list[str]): | |
| """Update topic tags on a paper.""" | |
| with get_conn() as conn: | |
| conn.execute( | |
| "UPDATE papers SET topics=? WHERE id=?", | |
| (json.dumps(topics), paper_id), | |
| ) | |
| def get_arxiv_id_map(run_id: int) -> dict[str, int]: | |
| """Return {arxiv_id: paper_db_id} for all papers in a run.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT id, arxiv_id FROM papers WHERE run_id=?", (run_id,) | |
| ).fetchall() | |
| return {row["arxiv_id"]: row["id"] for row in rows} | |
| def get_available_topics(domain: str, run_id: int) -> list[str]: | |
| """Get distinct topic tags used in a run.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT DISTINCT topics FROM papers " | |
| "WHERE domain=? AND run_id=? AND topics IS NOT NULL AND topics != '[]'", | |
| (domain, run_id), | |
| ).fetchall() | |
| all_topics: set[str] = set() | |
| for row in rows: | |
| try: | |
| all_topics.update(json.loads(row["topics"])) | |
| except (json.JSONDecodeError, TypeError): | |
| pass | |
| return sorted(all_topics) | |
| # --------------------------------------------------------------------------- | |
| # Full-text search (FTS5) | |
| # --------------------------------------------------------------------------- | |
| FTS_SORT_OPTIONS = { | |
| "rank": "fts_rank", | |
| "score": "p.composite DESC", | |
| "date": "p.published DESC", | |
| } | |
| def search_papers_fts( | |
| query: str, | |
| domain: str | None = None, | |
| sort: str = "rank", | |
| limit: int = 50, | |
| offset: int = 0, | |
| ) -> tuple[list[dict], int]: | |
| """Full-text search across all papers, deduped by arxiv_id. | |
| Returns (papers_with_snippets, total_count). | |
| """ | |
| with get_conn() as conn: | |
| # Dedup CTE: keep most-recently-scored version per arxiv_id+domain | |
| domain_filter = "" | |
| params: list = [] | |
| if domain: | |
| domain_filter = "AND p.domain = ?" | |
| params.append(domain) | |
| # BM25 weights: title=10, abstract=1, summary=5, topics=2 | |
| # snippet() markers: <mark>...</mark> | |
| sql = f""" | |
| WITH deduped AS ( | |
| SELECT MAX(id) AS id | |
| FROM papers | |
| WHERE composite IS NOT NULL | |
| GROUP BY arxiv_id, domain | |
| ) | |
| SELECT p.*, | |
| bm25(papers_fts, 10.0, 1.0, 5.0, 2.0) AS fts_rank, | |
| snippet(papers_fts, 0, '<mark>', '</mark>', '...', 40) AS snip_title, | |
| snippet(papers_fts, 1, '<mark>', '</mark>', '...', 40) AS snip_abstract, | |
| snippet(papers_fts, 2, '<mark>', '</mark>', '...', 40) AS snip_summary | |
| FROM papers_fts | |
| JOIN deduped d ON papers_fts.rowid = d.id | |
| JOIN papers p ON p.id = d.id | |
| WHERE papers_fts MATCH ? | |
| {domain_filter} | |
| ORDER BY {FTS_SORT_OPTIONS.get(sort, "fts_rank")} | |
| """ | |
| match_query = query | |
| params_full = [match_query] + params | |
| # Count total matches | |
| count_sql = f""" | |
| WITH deduped AS ( | |
| SELECT MAX(id) AS id | |
| FROM papers | |
| WHERE composite IS NOT NULL | |
| GROUP BY arxiv_id, domain | |
| ) | |
| SELECT COUNT(*) | |
| FROM papers_fts | |
| JOIN deduped d ON papers_fts.rowid = d.id | |
| JOIN papers p ON p.id = d.id | |
| WHERE papers_fts MATCH ? | |
| {domain_filter} | |
| """ | |
| try: | |
| total = conn.execute(count_sql, params_full).fetchone()[0] | |
| except sqlite3.OperationalError: | |
| # Bad FTS query syntax | |
| return [], 0 | |
| try: | |
| rows = conn.execute( | |
| sql + " LIMIT ? OFFSET ?", | |
| params_full + [limit, offset], | |
| ).fetchall() | |
| except sqlite3.OperationalError: | |
| return [], 0 | |
| results = [] | |
| for row in rows: | |
| d = _deserialize_paper(row) | |
| d["snip_title"] = row["snip_title"] | |
| d["snip_abstract"] = row["snip_abstract"] | |
| d["snip_summary"] = row["snip_summary"] | |
| results.append(d) | |
| return results, total | |
| # --------------------------------------------------------------------------- | |
| # GitHub project helpers | |
| # --------------------------------------------------------------------------- | |
| def insert_github_projects(projects: list[dict], run_id: int): | |
| """Bulk-insert GitHub projects into the DB.""" | |
| now = datetime.now(timezone.utc).isoformat() | |
| with get_conn() as conn: | |
| for p in projects: | |
| conn.execute( | |
| """INSERT OR IGNORE INTO github_projects | |
| (run_id, repo_id, repo_name, description, language, | |
| stars, forks, pull_requests, total_score, | |
| collection_names, topics, url, domain, fetched_at) | |
| VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", | |
| ( | |
| run_id, | |
| p.get("repo_id", 0), | |
| p.get("repo_name", ""), | |
| p.get("description", ""), | |
| p.get("language", ""), | |
| p.get("stars", 0), | |
| p.get("forks", 0), | |
| p.get("pull_requests", 0), | |
| p.get("total_score", 0), | |
| p.get("collection_names", ""), | |
| _serialize_json(p.get("topics", [])), | |
| p.get("url", ""), | |
| p.get("domain", ""), | |
| now, | |
| ), | |
| ) | |
| GH_SORT_OPTIONS = { | |
| "score": "total_score DESC", | |
| "stars": "stars DESC", | |
| "forks": "forks DESC", | |
| "name": "repo_name ASC", | |
| } | |
| def get_github_projects_page( | |
| run_id: int | None = None, | |
| offset: int = 0, | |
| limit: int = 50, | |
| search: str | None = None, | |
| language: str | None = None, | |
| domain: str | None = None, | |
| sort: str | None = None, | |
| ) -> tuple[list[dict], int]: | |
| """Paginated, filterable GitHub project list.""" | |
| with get_conn() as conn: | |
| if not run_id: | |
| latest = get_latest_run("github") | |
| if not latest: | |
| return [], 0 | |
| run_id = latest["id"] | |
| conditions = ["run_id=?"] | |
| params: list = [run_id] | |
| if search: | |
| conditions.append("(repo_name LIKE ? OR description LIKE ?)") | |
| params.extend([f"%{search}%", f"%{search}%"]) | |
| if language: | |
| conditions.append("language=?") | |
| params.append(language) | |
| if domain: | |
| conditions.append("domain=?") | |
| params.append(domain) | |
| where = " AND ".join(conditions) | |
| order = GH_SORT_OPTIONS.get(sort, "total_score DESC") | |
| total = conn.execute( | |
| f"SELECT COUNT(*) FROM github_projects WHERE {where}", params | |
| ).fetchone()[0] | |
| rows = conn.execute( | |
| f"SELECT * FROM github_projects WHERE {where} ORDER BY {order} LIMIT ? OFFSET ?", | |
| params + [limit, offset], | |
| ).fetchall() | |
| return [_deserialize_gh_project(row) for row in rows], total | |
| def get_top_github_projects(run_id: int | None = None, limit: int = 10) -> list[dict]: | |
| """Get top GitHub projects by score.""" | |
| with get_conn() as conn: | |
| if not run_id: | |
| latest = get_latest_run("github") | |
| if not latest: | |
| return [] | |
| run_id = latest["id"] | |
| rows = conn.execute( | |
| "SELECT * FROM github_projects WHERE run_id=? ORDER BY total_score DESC LIMIT ?", | |
| (run_id, limit), | |
| ).fetchall() | |
| return [_deserialize_gh_project(row) for row in rows] | |
| def count_github_projects(run_id: int | None = None) -> int: | |
| with get_conn() as conn: | |
| if not run_id: | |
| latest = get_latest_run("github") | |
| if not latest: | |
| return 0 | |
| run_id = latest["id"] | |
| return conn.execute( | |
| "SELECT COUNT(*) FROM github_projects WHERE run_id=?", (run_id,) | |
| ).fetchone()[0] | |
| def get_github_languages(run_id: int) -> list[str]: | |
| """Get distinct languages in a GitHub run.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT DISTINCT language FROM github_projects " | |
| "WHERE run_id=? AND language IS NOT NULL AND language != '' " | |
| "ORDER BY language", | |
| (run_id,), | |
| ).fetchall() | |
| return [row["language"] for row in rows] | |
| def _deserialize_gh_project(row) -> dict: | |
| d = dict(row) | |
| for key in ("topics",): | |
| val = d.get(key) | |
| if isinstance(val, str): | |
| try: | |
| d[key] = json.loads(val) | |
| except (json.JSONDecodeError, TypeError): | |
| d[key] = [] | |
| return d | |
| # --------------------------------------------------------------------------- | |
| # User signal helpers (preference learning) | |
| # --------------------------------------------------------------------------- | |
| def insert_signal(paper_id: int, action: str, metadata: dict | None = None) -> bool: | |
| """Record a user signal. Returns True if inserted, False if duplicate. | |
| Views are deduped by 5-minute window. Other actions use UNIQUE constraint. | |
| """ | |
| now = datetime.now(timezone.utc).isoformat() | |
| meta_json = json.dumps(metadata or {}) | |
| with get_conn() as conn: | |
| if action == "view": | |
| # Dedup views within 5-minute window | |
| recent = conn.execute( | |
| "SELECT 1 FROM user_signals " | |
| "WHERE paper_id=? AND action='view' " | |
| "AND created_at > datetime(?, '-5 minutes')", | |
| (paper_id, now), | |
| ).fetchone() | |
| if recent: | |
| return False | |
| conn.execute( | |
| "INSERT INTO user_signals (paper_id, action, created_at, metadata) " | |
| "VALUES (?, ?, ?, ?)", | |
| (paper_id, action, now, meta_json), | |
| ) | |
| return True | |
| else: | |
| try: | |
| conn.execute( | |
| "INSERT INTO user_signals (paper_id, action, created_at, metadata) " | |
| "VALUES (?, ?, ?, ?)", | |
| (paper_id, action, now, meta_json), | |
| ) | |
| return True | |
| except sqlite3.IntegrityError: | |
| return False | |
| def delete_signal(paper_id: int, action: str) -> bool: | |
| """Remove a signal (for toggling off). Returns True if deleted.""" | |
| with get_conn() as conn: | |
| cur = conn.execute( | |
| "DELETE FROM user_signals WHERE paper_id=? AND action=?", | |
| (paper_id, action), | |
| ) | |
| return cur.rowcount > 0 | |
| def get_paper_signal(paper_id: int) -> str | None: | |
| """Return the user's latest non-view signal for a paper, or None.""" | |
| with get_conn() as conn: | |
| row = conn.execute( | |
| "SELECT action FROM user_signals " | |
| "WHERE paper_id=? AND action != 'view' " | |
| "ORDER BY created_at DESC LIMIT 1", | |
| (paper_id,), | |
| ).fetchone() | |
| return row["action"] if row else None | |
| def get_paper_signals_batch(paper_ids: list[int]) -> dict[int, str]: | |
| """Batch fetch latest non-view signal per paper. Returns {paper_id: action}.""" | |
| if not paper_ids: | |
| return {} | |
| with get_conn() as conn: | |
| placeholders = ",".join("?" for _ in paper_ids) | |
| rows = conn.execute( | |
| f"SELECT paper_id, action FROM user_signals " | |
| f"WHERE paper_id IN ({placeholders}) AND action != 'view' " | |
| f"ORDER BY created_at DESC", | |
| paper_ids, | |
| ).fetchall() | |
| result: dict[int, str] = {} | |
| for row in rows: | |
| pid = row["paper_id"] | |
| if pid not in result: | |
| result[pid] = row["action"] | |
| return result | |
| def get_all_signals_with_papers() -> list[dict]: | |
| """Join signals with paper data for preference computation.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| """SELECT s.id as signal_id, s.paper_id, s.action, s.created_at, | |
| p.title, p.categories, p.topics, p.authors, p.domain, | |
| p.score_axis_1, p.score_axis_2, p.score_axis_3, p.composite | |
| FROM user_signals s | |
| JOIN papers p ON s.paper_id = p.id | |
| ORDER BY s.created_at DESC""" | |
| ).fetchall() | |
| results = [] | |
| for row in rows: | |
| d = dict(row) | |
| for key in ("categories", "topics", "authors"): | |
| val = d.get(key) | |
| if isinstance(val, str): | |
| try: | |
| d[key] = json.loads(val) | |
| except (json.JSONDecodeError, TypeError): | |
| d[key] = [] | |
| results.append(d) | |
| return results | |
| def get_signal_counts() -> dict[str, int]: | |
| """Summary stats: count per action type.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT action, COUNT(*) as cnt FROM user_signals GROUP BY action" | |
| ).fetchall() | |
| return {row["action"]: row["cnt"] for row in rows} | |
| def save_preferences(prefs: dict[str, tuple[float, int]]): | |
| """Bulk write preferences. prefs = {key: (value, signal_count)}.""" | |
| now = datetime.now(timezone.utc).isoformat() | |
| with get_conn() as conn: | |
| conn.execute("DELETE FROM user_preferences") | |
| for key, (value, count) in prefs.items(): | |
| conn.execute( | |
| "INSERT INTO user_preferences (pref_key, pref_value, signal_count, updated_at) " | |
| "VALUES (?, ?, ?, ?)", | |
| (key, value, count, now), | |
| ) | |
| def load_preferences() -> dict[str, float]: | |
| """Load preference profile. Returns {pref_key: pref_value}.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT pref_key, pref_value FROM user_preferences" | |
| ).fetchall() | |
| return {row["pref_key"]: row["pref_value"] for row in rows} | |
| def get_preferences_detail() -> list[dict]: | |
| """Load full preference details for the preferences page.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT * FROM user_preferences ORDER BY ABS(pref_value) DESC" | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| def get_preferences_updated_at() -> str | None: | |
| """Return when preferences were last computed.""" | |
| with get_conn() as conn: | |
| row = conn.execute( | |
| "SELECT updated_at FROM user_preferences ORDER BY updated_at DESC LIMIT 1" | |
| ).fetchone() | |
| return row["updated_at"] if row else None | |
| def clear_preferences(): | |
| """Reset all preferences and signals.""" | |
| with get_conn() as conn: | |
| conn.execute("DELETE FROM user_preferences") | |
| conn.execute("DELETE FROM user_signals") | |
| def upsert_seed_papers(papers: list[dict]) -> dict[str, int]: | |
| """Ensure seed papers exist in DB, return {arxiv_id: paper_db_id}. | |
| For each paper: if arxiv_id already exists, use the existing row's id. | |
| Otherwise INSERT a stub row with run_id=NULL and source='seed'. | |
| """ | |
| result: dict[str, int] = {} | |
| with get_conn() as conn: | |
| for p in papers: | |
| arxiv_id = p.get("arxiv_id", "").strip() | |
| if not arxiv_id: | |
| continue | |
| # Check if paper already exists (from any run) | |
| row = conn.execute( | |
| "SELECT id FROM papers WHERE arxiv_id=? LIMIT 1", | |
| (arxiv_id,), | |
| ).fetchone() | |
| if row: | |
| result[arxiv_id] = row["id"] | |
| else: | |
| # Insert stub — run_id=NULL is valid (no NOT NULL constraint). | |
| # OR IGNORE handles the race where a concurrent request already | |
| # inserted this seed paper (idx_papers_seed_dedup). | |
| domain = p.get("domain", "aiml") | |
| conn.execute( | |
| """INSERT OR IGNORE INTO papers | |
| (run_id, domain, arxiv_id, entry_id, title, authors, | |
| abstract, published, categories, pdf_url, arxiv_url, | |
| comment, source) | |
| VALUES (NULL,?,?,?,?,?,?,?,?,?,?,?,?)""", | |
| ( | |
| domain, | |
| arxiv_id, | |
| p.get("entry_id", ""), | |
| p.get("title", ""), | |
| _serialize_json(p.get("authors", [])), | |
| p.get("abstract", ""), | |
| p.get("published", ""), | |
| _serialize_json(p.get("categories", [])), | |
| p.get("pdf_url", ""), | |
| p.get("arxiv_url", f"https://arxiv.org/abs/{arxiv_id}"), | |
| p.get("comment", ""), | |
| "seed", | |
| ), | |
| ) | |
| # Re-query to get the id (handles both fresh insert and OR IGNORE) | |
| inserted = conn.execute( | |
| "SELECT id FROM papers WHERE arxiv_id=? AND run_id IS NULL LIMIT 1", | |
| (arxiv_id,), | |
| ).fetchone() | |
| if inserted: | |
| result[arxiv_id] = inserted["id"] | |
| return result | |