"""Database layer — SQLite schema, connection, and query helpers."""
import json
import logging
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
log = logging.getLogger(__name__)
def get_db_path() -> Path:
from src.config import DB_PATH
return DB_PATH
@contextmanager
def get_conn():
"""Yield a SQLite connection with WAL mode and foreign keys."""
path = get_db_path()
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
log.exception("Database transaction failed")
raise
finally:
conn.close()
def init_db():
"""Create tables if they don't exist."""
with get_conn() as conn:
conn.executescript(SCHEMA)
for sql in _MIGRATIONS:
try:
conn.execute(sql)
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower() or "already exists" in str(e).lower():
pass # Expected — column/index already exists
else:
log.warning("Migration failed: %s — %s", sql.strip()[:60], e)
# Rebuild FTS index from content table (idempotent, fast for a few thousand rows)
conn.execute("INSERT INTO papers_fts(papers_fts) VALUES('rebuild')")
SCHEMA = """\
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY,
domain TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
date_start TEXT NOT NULL,
date_end TEXT NOT NULL,
paper_count INTEGER DEFAULT 0,
status TEXT DEFAULT 'running'
);
CREATE TABLE IF NOT EXISTS papers (
id INTEGER PRIMARY KEY,
run_id INTEGER REFERENCES runs(id),
domain TEXT NOT NULL,
arxiv_id TEXT NOT NULL,
entry_id TEXT,
title TEXT NOT NULL,
authors TEXT,
abstract TEXT,
published TEXT,
categories TEXT,
pdf_url TEXT,
arxiv_url TEXT,
comment TEXT,
source TEXT,
github_repo TEXT,
github_stars INTEGER,
hf_upvotes INTEGER DEFAULT 0,
hf_models TEXT,
hf_datasets TEXT,
hf_spaces TEXT,
score_axis_1 REAL,
score_axis_2 REAL,
score_axis_3 REAL,
composite REAL,
summary TEXT,
reasoning TEXT,
code_url TEXT,
UNIQUE(domain, arxiv_id, run_id)
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
run_id INTEGER,
category TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
url TEXT,
event_date TEXT,
source TEXT,
relevance_score REAL,
fetched_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS paper_connections (
id INTEGER PRIMARY KEY,
paper_id INTEGER NOT NULL REFERENCES papers(id),
connected_arxiv_id TEXT,
connected_s2_id TEXT,
connected_title TEXT,
connected_year INTEGER,
connection_type TEXT NOT NULL,
in_db_paper_id INTEGER,
fetched_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_papers_domain_composite
ON papers(domain, composite DESC);
CREATE INDEX IF NOT EXISTS idx_papers_run ON papers(run_id);
CREATE INDEX IF NOT EXISTS idx_events_category ON events(category, event_date);
CREATE INDEX IF NOT EXISTS idx_connections_paper ON paper_connections(paper_id);
CREATE INDEX IF NOT EXISTS idx_connections_arxiv ON paper_connections(connected_arxiv_id);
CREATE INDEX IF NOT EXISTS idx_papers_arxiv_id ON papers(arxiv_id);
CREATE INDEX IF NOT EXISTS idx_papers_published ON papers(published);
CREATE INDEX IF NOT EXISTS idx_events_run_id ON events(run_id);
CREATE TABLE IF NOT EXISTS github_projects (
id INTEGER PRIMARY KEY,
run_id INTEGER REFERENCES runs(id),
repo_id INTEGER NOT NULL,
repo_name TEXT NOT NULL,
description TEXT,
language TEXT,
stars INTEGER DEFAULT 0,
forks INTEGER DEFAULT 0,
pull_requests INTEGER DEFAULT 0,
total_score REAL DEFAULT 0,
collection_names TEXT,
topics TEXT DEFAULT '[]',
url TEXT NOT NULL,
domain TEXT,
fetched_at TEXT NOT NULL,
UNIQUE(repo_name, run_id)
);
CREATE INDEX IF NOT EXISTS idx_gh_run ON github_projects(run_id);
CREATE INDEX IF NOT EXISTS idx_gh_domain ON github_projects(domain, total_score DESC);
CREATE INDEX IF NOT EXISTS idx_gh_repo ON github_projects(repo_name);
CREATE TABLE IF NOT EXISTS user_signals (
id INTEGER PRIMARY KEY,
paper_id INTEGER NOT NULL REFERENCES papers(id),
action TEXT NOT NULL CHECK(action IN ('save','view','upvote','downvote','dismiss')),
created_at TEXT NOT NULL,
metadata TEXT DEFAULT '{}'
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_signals_paper_action
ON user_signals(paper_id, action) WHERE action != 'view';
CREATE INDEX IF NOT EXISTS idx_signals_created ON user_signals(created_at);
CREATE INDEX IF NOT EXISTS idx_signals_paper ON user_signals(paper_id);
CREATE TABLE IF NOT EXISTS user_preferences (
id INTEGER PRIMARY KEY,
pref_key TEXT NOT NULL UNIQUE,
pref_value REAL NOT NULL DEFAULT 0.0,
signal_count INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_prefs_key ON user_preferences(pref_key);
CREATE VIRTUAL TABLE IF NOT EXISTS papers_fts USING fts5(
title, abstract, summary, topics,
content='papers', content_rowid='id',
tokenize='porter unicode61'
);
CREATE TRIGGER IF NOT EXISTS papers_ai AFTER INSERT ON papers BEGIN
INSERT INTO papers_fts(rowid, title, abstract, summary, topics)
VALUES (new.id, new.title, new.abstract, new.summary, new.topics);
END;
CREATE TRIGGER IF NOT EXISTS papers_ad AFTER DELETE ON papers BEGIN
INSERT INTO papers_fts(papers_fts, rowid, title, abstract, summary, topics)
VALUES ('delete', old.id, old.title, old.abstract, old.summary, old.topics);
END;
CREATE TRIGGER IF NOT EXISTS papers_au AFTER UPDATE ON papers BEGIN
INSERT INTO papers_fts(papers_fts, rowid, title, abstract, summary, topics)
VALUES ('delete', old.id, old.title, old.abstract, old.summary, old.topics);
INSERT INTO papers_fts(rowid, title, abstract, summary, topics)
VALUES (new.id, new.title, new.abstract, new.summary, new.topics);
END;
"""
# Columns added after initial schema — idempotent via try/except
_MIGRATIONS = [
"ALTER TABLE papers ADD COLUMN s2_tldr TEXT",
"ALTER TABLE papers ADD COLUMN s2_paper_id TEXT",
"ALTER TABLE papers ADD COLUMN topics TEXT DEFAULT '[]'",
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_unique ON events(title, category)",
# Prevent duplicate seed papers (NULL run_id) for the same arxiv_id+domain
"CREATE UNIQUE INDEX IF NOT EXISTS idx_papers_seed_dedup ON papers(domain, arxiv_id) WHERE run_id IS NULL",
]
# ---------------------------------------------------------------------------
# Run helpers
# ---------------------------------------------------------------------------
def create_run(domain: str, date_start: str, date_end: str) -> int:
"""Insert a new pipeline run, return its ID."""
now = datetime.now(timezone.utc).isoformat()
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO runs (domain, started_at, date_start, date_end, status) "
"VALUES (?, ?, ?, ?, 'running')",
(domain, now, date_start, date_end),
)
return cur.lastrowid
def finish_run(run_id: int, paper_count: int, status: str = "completed"):
now = datetime.now(timezone.utc).isoformat()
with get_conn() as conn:
conn.execute(
"UPDATE runs SET finished_at=?, paper_count=?, status=? WHERE id=?",
(now, paper_count, status, run_id),
)
def get_latest_run(domain: str) -> dict | None:
with get_conn() as conn:
row = conn.execute(
"SELECT * FROM runs WHERE domain=? ORDER BY id DESC LIMIT 1",
(domain,),
).fetchone()
return dict(row) if row else None
def get_run(run_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute("SELECT * FROM runs WHERE id=?", (run_id,)).fetchone()
return dict(row) if row else None
# ---------------------------------------------------------------------------
# Paper helpers
# ---------------------------------------------------------------------------
def _serialize_json(val):
"""JSON-encode lists/dicts for storage."""
if isinstance(val, (list, dict)):
return json.dumps(val)
return val
def insert_papers(papers: list[dict], run_id: int, domain: str):
"""Bulk-insert papers into the DB."""
with get_conn() as conn:
for p in papers:
conn.execute(
"""INSERT OR IGNORE INTO papers
(run_id, domain, arxiv_id, entry_id, title, authors, abstract,
published, categories, pdf_url, arxiv_url, comment, source,
github_repo, github_stars, hf_upvotes, hf_models, hf_datasets, hf_spaces)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
run_id, domain,
p.get("arxiv_id", ""),
p.get("entry_id", ""),
p.get("title", ""),
_serialize_json(p.get("authors", [])),
p.get("abstract", ""),
p.get("published", ""),
_serialize_json(p.get("categories", [])),
p.get("pdf_url", ""),
p.get("arxiv_url", ""),
p.get("comment", ""),
p.get("source", ""),
p.get("github_repo", ""),
p.get("github_stars"),
p.get("hf_upvotes", 0),
_serialize_json(p.get("hf_models", [])),
_serialize_json(p.get("hf_datasets", [])),
_serialize_json(p.get("hf_spaces", [])),
),
)
def update_paper_scores(paper_id: int, scores: dict):
"""Update a paper's scores after Claude scoring."""
with get_conn() as conn:
conn.execute(
"""UPDATE papers SET
score_axis_1=?, score_axis_2=?, score_axis_3=?,
composite=?, summary=?, reasoning=?, code_url=?
WHERE id=?""",
(
scores.get("score_axis_1"),
scores.get("score_axis_2"),
scores.get("score_axis_3"),
scores.get("composite"),
scores.get("summary", ""),
scores.get("reasoning", ""),
scores.get("code_url"),
paper_id,
),
)
def get_unscored_papers(run_id: int) -> list[dict]:
"""Get papers from a run that haven't been scored yet."""
with get_conn() as conn:
rows = conn.execute(
"SELECT * FROM papers WHERE run_id=? AND composite IS NULL",
(run_id,),
).fetchall()
return [_deserialize_paper(row) for row in rows]
def get_top_papers(domain: str, run_id: int | None = None, limit: int = 20) -> list[dict]:
"""Get top-scored papers for a domain, optionally from a specific run."""
with get_conn() as conn:
if run_id:
rows = conn.execute(
"SELECT * FROM papers WHERE domain=? AND run_id=? AND composite IS NOT NULL "
"ORDER BY composite DESC LIMIT ?",
(domain, run_id, limit),
).fetchall()
else:
# Latest run
latest = get_latest_run(domain)
if not latest:
return []
rows = conn.execute(
"SELECT * FROM papers WHERE domain=? AND run_id=? AND composite IS NOT NULL "
"ORDER BY composite DESC LIMIT ?",
(domain, latest["id"], limit),
).fetchall()
return [_deserialize_paper(row) for row in rows]
def get_paper(paper_id: int) -> dict | None:
with get_conn() as conn:
row = conn.execute("SELECT * FROM papers WHERE id=?", (paper_id,)).fetchone()
return _deserialize_paper(row) if row else None
SORT_OPTIONS = {
"score": "composite DESC",
"date": "published DESC",
"axis1": "score_axis_1 DESC",
"axis2": "score_axis_2 DESC",
"axis3": "score_axis_3 DESC",
"title": "title ASC",
}
def get_papers_page(domain: str, run_id: int | None = None,
offset: int = 0, limit: int = 50,
min_score: float | None = None,
has_code: bool | None = None,
search: str | None = None,
topic: str | None = None,
sort: str | None = None) -> tuple[list[dict], int]:
"""Paginated, filterable paper list. Returns (papers, total_count)."""
with get_conn() as conn:
if not run_id:
latest = get_latest_run(domain)
if not latest:
return [], 0
run_id = latest["id"]
conditions = ["domain=?", "run_id=?", "composite IS NOT NULL"]
params: list = [domain, run_id]
if min_score is not None:
conditions.append("composite >= ?")
params.append(min_score)
if has_code:
conditions.append("(code_url IS NOT NULL AND code_url != '')")
if search:
conditions.append("(title LIKE ? OR abstract LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
if topic:
conditions.append("topics LIKE ?")
params.append(f'%"{topic}"%')
where = " AND ".join(conditions)
order = SORT_OPTIONS.get(sort, "composite DESC")
total = conn.execute(
f"SELECT COUNT(*) FROM papers WHERE {where}", params
).fetchone()[0]
rows = conn.execute(
f"SELECT * FROM papers WHERE {where} ORDER BY {order} LIMIT ? OFFSET ?",
params + [limit, offset],
).fetchall()
return [_deserialize_paper(row) for row in rows], total
def count_papers(domain: str, run_id: int | None = None, scored_only: bool = False) -> int:
with get_conn() as conn:
if not run_id:
latest = get_latest_run(domain)
if not latest:
return 0
run_id = latest["id"]
sql = "SELECT COUNT(*) FROM papers WHERE domain=? AND run_id=?"
if scored_only:
sql += " AND composite IS NOT NULL"
row = conn.execute(sql, (domain, run_id)).fetchone()
return row[0] if row else 0
def _deserialize_paper(row) -> dict:
"""Convert a sqlite3.Row to a dict, parsing JSON fields."""
d = dict(row)
for key in ("authors", "categories", "hf_models", "hf_datasets", "hf_spaces", "topics"):
val = d.get(key)
if isinstance(val, str):
try:
d[key] = json.loads(val)
except (json.JSONDecodeError, TypeError):
d[key] = []
return d
# ---------------------------------------------------------------------------
# Event helpers
# ---------------------------------------------------------------------------
def insert_events(events: list[dict], run_id: int | None = None):
now = datetime.now(timezone.utc).isoformat()
with get_conn() as conn:
for e in events:
conn.execute(
"""INSERT OR IGNORE INTO events
(run_id, category, title, description, url, event_date,
source, relevance_score, fetched_at)
VALUES (?,?,?,?,?,?,?,?,?)""",
(
run_id,
e.get("category", ""),
e.get("title", ""),
e.get("description", ""),
e.get("url", ""),
e.get("event_date", ""),
e.get("source", ""),
e.get("relevance_score"),
now,
),
)
def get_events(category: str | None = None, limit: int = 50) -> list[dict]:
with get_conn() as conn:
if category:
rows = conn.execute(
"SELECT * FROM events WHERE category=? ORDER BY event_date DESC LIMIT ?",
(category, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM events ORDER BY fetched_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(row) for row in rows]
def count_events() -> int:
with get_conn() as conn:
return conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
# ---------------------------------------------------------------------------
# Dashboard helpers
# ---------------------------------------------------------------------------
def get_all_runs(limit: int = 20) -> list[dict]:
with get_conn() as conn:
rows = conn.execute(
"SELECT * FROM runs ORDER BY id DESC LIMIT ?", (limit,)
).fetchall()
return [dict(row) for row in rows]
# ---------------------------------------------------------------------------
# Paper connections (Semantic Scholar)
# ---------------------------------------------------------------------------
def insert_connections(connections: list[dict]):
"""Bulk-insert paper connections."""
now = datetime.now(timezone.utc).isoformat()
with get_conn() as conn:
for c in connections:
conn.execute(
"""INSERT INTO paper_connections
(paper_id, connected_arxiv_id, connected_s2_id,
connected_title, connected_year, connection_type,
in_db_paper_id, fetched_at)
VALUES (?,?,?,?,?,?,?,?)""",
(
c["paper_id"],
c.get("connected_arxiv_id", ""),
c.get("connected_s2_id", ""),
c.get("connected_title", ""),
c.get("connected_year"),
c["connection_type"],
c.get("in_db_paper_id"),
now,
),
)
def get_paper_connections(paper_id: int) -> dict:
"""Get connected papers grouped by type."""
with get_conn() as conn:
rows = conn.execute(
"SELECT * FROM paper_connections WHERE paper_id=? "
"ORDER BY connection_type, connected_year DESC",
(paper_id,),
).fetchall()
result = {"references": [], "recommendations": []}
for row in rows:
d = dict(row)
ctype = d["connection_type"]
if ctype in result:
result[ctype].append(d)
return result
def clear_connections(paper_id: int):
"""Remove existing connections for a paper (before re-enrichment)."""
with get_conn() as conn:
conn.execute("DELETE FROM paper_connections WHERE paper_id=?", (paper_id,))
def update_paper_s2(paper_id: int, s2_paper_id: str, s2_tldr: str):
"""Update S2 metadata on a paper."""
with get_conn() as conn:
conn.execute(
"UPDATE papers SET s2_paper_id=?, s2_tldr=? WHERE id=?",
(s2_paper_id, s2_tldr, paper_id),
)
def update_paper_topics(paper_id: int, topics: list[str]):
"""Update topic tags on a paper."""
with get_conn() as conn:
conn.execute(
"UPDATE papers SET topics=? WHERE id=?",
(json.dumps(topics), paper_id),
)
def get_arxiv_id_map(run_id: int) -> dict[str, int]:
"""Return {arxiv_id: paper_db_id} for all papers in a run."""
with get_conn() as conn:
rows = conn.execute(
"SELECT id, arxiv_id FROM papers WHERE run_id=?", (run_id,)
).fetchall()
return {row["arxiv_id"]: row["id"] for row in rows}
def get_available_topics(domain: str, run_id: int) -> list[str]:
"""Get distinct topic tags used in a run."""
with get_conn() as conn:
rows = conn.execute(
"SELECT DISTINCT topics FROM papers "
"WHERE domain=? AND run_id=? AND topics IS NOT NULL AND topics != '[]'",
(domain, run_id),
).fetchall()
all_topics: set[str] = set()
for row in rows:
try:
all_topics.update(json.loads(row["topics"]))
except (json.JSONDecodeError, TypeError):
pass
return sorted(all_topics)
# ---------------------------------------------------------------------------
# Full-text search (FTS5)
# ---------------------------------------------------------------------------
FTS_SORT_OPTIONS = {
"rank": "fts_rank",
"score": "p.composite DESC",
"date": "p.published DESC",
}
def search_papers_fts(
query: str,
domain: str | None = None,
sort: str = "rank",
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Full-text search across all papers, deduped by arxiv_id.
Returns (papers_with_snippets, total_count).
"""
with get_conn() as conn:
# Dedup CTE: keep most-recently-scored version per arxiv_id+domain
domain_filter = ""
params: list = []
if domain:
domain_filter = "AND p.domain = ?"
params.append(domain)
# BM25 weights: title=10, abstract=1, summary=5, topics=2
# snippet() markers: ...
sql = f"""
WITH deduped AS (
SELECT MAX(id) AS id
FROM papers
WHERE composite IS NOT NULL
GROUP BY arxiv_id, domain
)
SELECT p.*,
bm25(papers_fts, 10.0, 1.0, 5.0, 2.0) AS fts_rank,
snippet(papers_fts, 0, '', '', '...', 40) AS snip_title,
snippet(papers_fts, 1, '', '', '...', 40) AS snip_abstract,
snippet(papers_fts, 2, '', '', '...', 40) AS snip_summary
FROM papers_fts
JOIN deduped d ON papers_fts.rowid = d.id
JOIN papers p ON p.id = d.id
WHERE papers_fts MATCH ?
{domain_filter}
ORDER BY {FTS_SORT_OPTIONS.get(sort, "fts_rank")}
"""
match_query = query
params_full = [match_query] + params
# Count total matches
count_sql = f"""
WITH deduped AS (
SELECT MAX(id) AS id
FROM papers
WHERE composite IS NOT NULL
GROUP BY arxiv_id, domain
)
SELECT COUNT(*)
FROM papers_fts
JOIN deduped d ON papers_fts.rowid = d.id
JOIN papers p ON p.id = d.id
WHERE papers_fts MATCH ?
{domain_filter}
"""
try:
total = conn.execute(count_sql, params_full).fetchone()[0]
except sqlite3.OperationalError:
# Bad FTS query syntax
return [], 0
try:
rows = conn.execute(
sql + " LIMIT ? OFFSET ?",
params_full + [limit, offset],
).fetchall()
except sqlite3.OperationalError:
return [], 0
results = []
for row in rows:
d = _deserialize_paper(row)
d["snip_title"] = row["snip_title"]
d["snip_abstract"] = row["snip_abstract"]
d["snip_summary"] = row["snip_summary"]
results.append(d)
return results, total
# ---------------------------------------------------------------------------
# GitHub project helpers
# ---------------------------------------------------------------------------
def insert_github_projects(projects: list[dict], run_id: int):
"""Bulk-insert GitHub projects into the DB."""
now = datetime.now(timezone.utc).isoformat()
with get_conn() as conn:
for p in projects:
conn.execute(
"""INSERT OR IGNORE INTO github_projects
(run_id, repo_id, repo_name, description, language,
stars, forks, pull_requests, total_score,
collection_names, topics, url, domain, fetched_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
run_id,
p.get("repo_id", 0),
p.get("repo_name", ""),
p.get("description", ""),
p.get("language", ""),
p.get("stars", 0),
p.get("forks", 0),
p.get("pull_requests", 0),
p.get("total_score", 0),
p.get("collection_names", ""),
_serialize_json(p.get("topics", [])),
p.get("url", ""),
p.get("domain", ""),
now,
),
)
GH_SORT_OPTIONS = {
"score": "total_score DESC",
"stars": "stars DESC",
"forks": "forks DESC",
"name": "repo_name ASC",
}
def get_github_projects_page(
run_id: int | None = None,
offset: int = 0,
limit: int = 50,
search: str | None = None,
language: str | None = None,
domain: str | None = None,
sort: str | None = None,
) -> tuple[list[dict], int]:
"""Paginated, filterable GitHub project list."""
with get_conn() as conn:
if not run_id:
latest = get_latest_run("github")
if not latest:
return [], 0
run_id = latest["id"]
conditions = ["run_id=?"]
params: list = [run_id]
if search:
conditions.append("(repo_name LIKE ? OR description LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
if language:
conditions.append("language=?")
params.append(language)
if domain:
conditions.append("domain=?")
params.append(domain)
where = " AND ".join(conditions)
order = GH_SORT_OPTIONS.get(sort, "total_score DESC")
total = conn.execute(
f"SELECT COUNT(*) FROM github_projects WHERE {where}", params
).fetchone()[0]
rows = conn.execute(
f"SELECT * FROM github_projects WHERE {where} ORDER BY {order} LIMIT ? OFFSET ?",
params + [limit, offset],
).fetchall()
return [_deserialize_gh_project(row) for row in rows], total
def get_top_github_projects(run_id: int | None = None, limit: int = 10) -> list[dict]:
"""Get top GitHub projects by score."""
with get_conn() as conn:
if not run_id:
latest = get_latest_run("github")
if not latest:
return []
run_id = latest["id"]
rows = conn.execute(
"SELECT * FROM github_projects WHERE run_id=? ORDER BY total_score DESC LIMIT ?",
(run_id, limit),
).fetchall()
return [_deserialize_gh_project(row) for row in rows]
def count_github_projects(run_id: int | None = None) -> int:
with get_conn() as conn:
if not run_id:
latest = get_latest_run("github")
if not latest:
return 0
run_id = latest["id"]
return conn.execute(
"SELECT COUNT(*) FROM github_projects WHERE run_id=?", (run_id,)
).fetchone()[0]
def get_github_languages(run_id: int) -> list[str]:
"""Get distinct languages in a GitHub run."""
with get_conn() as conn:
rows = conn.execute(
"SELECT DISTINCT language FROM github_projects "
"WHERE run_id=? AND language IS NOT NULL AND language != '' "
"ORDER BY language",
(run_id,),
).fetchall()
return [row["language"] for row in rows]
def _deserialize_gh_project(row) -> dict:
d = dict(row)
for key in ("topics",):
val = d.get(key)
if isinstance(val, str):
try:
d[key] = json.loads(val)
except (json.JSONDecodeError, TypeError):
d[key] = []
return d
# ---------------------------------------------------------------------------
# User signal helpers (preference learning)
# ---------------------------------------------------------------------------
def insert_signal(paper_id: int, action: str, metadata: dict | None = None) -> bool:
"""Record a user signal. Returns True if inserted, False if duplicate.
Views are deduped by 5-minute window. Other actions use UNIQUE constraint.
"""
now = datetime.now(timezone.utc).isoformat()
meta_json = json.dumps(metadata or {})
with get_conn() as conn:
if action == "view":
# Dedup views within 5-minute window
recent = conn.execute(
"SELECT 1 FROM user_signals "
"WHERE paper_id=? AND action='view' "
"AND created_at > datetime(?, '-5 minutes')",
(paper_id, now),
).fetchone()
if recent:
return False
conn.execute(
"INSERT INTO user_signals (paper_id, action, created_at, metadata) "
"VALUES (?, ?, ?, ?)",
(paper_id, action, now, meta_json),
)
return True
else:
try:
conn.execute(
"INSERT INTO user_signals (paper_id, action, created_at, metadata) "
"VALUES (?, ?, ?, ?)",
(paper_id, action, now, meta_json),
)
return True
except sqlite3.IntegrityError:
return False
def delete_signal(paper_id: int, action: str) -> bool:
"""Remove a signal (for toggling off). Returns True if deleted."""
with get_conn() as conn:
cur = conn.execute(
"DELETE FROM user_signals WHERE paper_id=? AND action=?",
(paper_id, action),
)
return cur.rowcount > 0
def get_paper_signal(paper_id: int) -> str | None:
"""Return the user's latest non-view signal for a paper, or None."""
with get_conn() as conn:
row = conn.execute(
"SELECT action FROM user_signals "
"WHERE paper_id=? AND action != 'view' "
"ORDER BY created_at DESC LIMIT 1",
(paper_id,),
).fetchone()
return row["action"] if row else None
def get_paper_signals_batch(paper_ids: list[int]) -> dict[int, str]:
"""Batch fetch latest non-view signal per paper. Returns {paper_id: action}."""
if not paper_ids:
return {}
with get_conn() as conn:
placeholders = ",".join("?" for _ in paper_ids)
rows = conn.execute(
f"SELECT paper_id, action FROM user_signals "
f"WHERE paper_id IN ({placeholders}) AND action != 'view' "
f"ORDER BY created_at DESC",
paper_ids,
).fetchall()
result: dict[int, str] = {}
for row in rows:
pid = row["paper_id"]
if pid not in result:
result[pid] = row["action"]
return result
def get_all_signals_with_papers() -> list[dict]:
"""Join signals with paper data for preference computation."""
with get_conn() as conn:
rows = conn.execute(
"""SELECT s.id as signal_id, s.paper_id, s.action, s.created_at,
p.title, p.categories, p.topics, p.authors, p.domain,
p.score_axis_1, p.score_axis_2, p.score_axis_3, p.composite
FROM user_signals s
JOIN papers p ON s.paper_id = p.id
ORDER BY s.created_at DESC"""
).fetchall()
results = []
for row in rows:
d = dict(row)
for key in ("categories", "topics", "authors"):
val = d.get(key)
if isinstance(val, str):
try:
d[key] = json.loads(val)
except (json.JSONDecodeError, TypeError):
d[key] = []
results.append(d)
return results
def get_signal_counts() -> dict[str, int]:
"""Summary stats: count per action type."""
with get_conn() as conn:
rows = conn.execute(
"SELECT action, COUNT(*) as cnt FROM user_signals GROUP BY action"
).fetchall()
return {row["action"]: row["cnt"] for row in rows}
def save_preferences(prefs: dict[str, tuple[float, int]]):
"""Bulk write preferences. prefs = {key: (value, signal_count)}."""
now = datetime.now(timezone.utc).isoformat()
with get_conn() as conn:
conn.execute("DELETE FROM user_preferences")
for key, (value, count) in prefs.items():
conn.execute(
"INSERT INTO user_preferences (pref_key, pref_value, signal_count, updated_at) "
"VALUES (?, ?, ?, ?)",
(key, value, count, now),
)
def load_preferences() -> dict[str, float]:
"""Load preference profile. Returns {pref_key: pref_value}."""
with get_conn() as conn:
rows = conn.execute(
"SELECT pref_key, pref_value FROM user_preferences"
).fetchall()
return {row["pref_key"]: row["pref_value"] for row in rows}
def get_preferences_detail() -> list[dict]:
"""Load full preference details for the preferences page."""
with get_conn() as conn:
rows = conn.execute(
"SELECT * FROM user_preferences ORDER BY ABS(pref_value) DESC"
).fetchall()
return [dict(row) for row in rows]
def get_preferences_updated_at() -> str | None:
"""Return when preferences were last computed."""
with get_conn() as conn:
row = conn.execute(
"SELECT updated_at FROM user_preferences ORDER BY updated_at DESC LIMIT 1"
).fetchone()
return row["updated_at"] if row else None
def clear_preferences():
"""Reset all preferences and signals."""
with get_conn() as conn:
conn.execute("DELETE FROM user_preferences")
conn.execute("DELETE FROM user_signals")
def upsert_seed_papers(papers: list[dict]) -> dict[str, int]:
"""Ensure seed papers exist in DB, return {arxiv_id: paper_db_id}.
For each paper: if arxiv_id already exists, use the existing row's id.
Otherwise INSERT a stub row with run_id=NULL and source='seed'.
"""
result: dict[str, int] = {}
with get_conn() as conn:
for p in papers:
arxiv_id = p.get("arxiv_id", "").strip()
if not arxiv_id:
continue
# Check if paper already exists (from any run)
row = conn.execute(
"SELECT id FROM papers WHERE arxiv_id=? LIMIT 1",
(arxiv_id,),
).fetchone()
if row:
result[arxiv_id] = row["id"]
else:
# Insert stub — run_id=NULL is valid (no NOT NULL constraint).
# OR IGNORE handles the race where a concurrent request already
# inserted this seed paper (idx_papers_seed_dedup).
domain = p.get("domain", "aiml")
conn.execute(
"""INSERT OR IGNORE INTO papers
(run_id, domain, arxiv_id, entry_id, title, authors,
abstract, published, categories, pdf_url, arxiv_url,
comment, source)
VALUES (NULL,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
domain,
arxiv_id,
p.get("entry_id", ""),
p.get("title", ""),
_serialize_json(p.get("authors", [])),
p.get("abstract", ""),
p.get("published", ""),
_serialize_json(p.get("categories", [])),
p.get("pdf_url", ""),
p.get("arxiv_url", f"https://arxiv.org/abs/{arxiv_id}"),
p.get("comment", ""),
"seed",
),
)
# Re-query to get the id (handles both fresh insert and OR IGNORE)
inserted = conn.execute(
"SELECT id FROM papers WHERE arxiv_id=? AND run_id IS NULL LIMIT 1",
(arxiv_id,),
).fetchone()
if inserted:
result[arxiv_id] = inserted["id"]
return result