Researcher / src /pipelines /github.py
amarck's picture
Initial commit: Research Intelligence System
a0f27fa
"""GitHub projects pipeline — discover trending repos via OSSInsight.io API.
Two strategies:
1. Trending repos — weekly trending filtered by AI/ML and security keywords
2. Collection rankings — curated collections ranked by star growth
"""
import logging
import time
from datetime import datetime, timedelta, timezone
import requests
from src.config import (
GITHUB_AIML_KEYWORDS,
GITHUB_SECURITY_KEYWORDS,
OSSINSIGHT_API,
OSSINSIGHT_COLLECTIONS,
OSSINSIGHT_TRENDING_LANGUAGES,
)
from src.db import create_run, finish_run, insert_github_projects
log = logging.getLogger(__name__)
_SESSION = requests.Session()
_SESSION.headers["Accept"] = "application/json"
def _safe_int(val, default=0) -> int:
"""Parse an int from a value that may be empty string or None."""
if not val and val != 0:
return default
try:
return int(val)
except (ValueError, TypeError):
return default
def _safe_float(val, default=0.0) -> float:
if not val and val != 0:
return default
try:
return float(val)
except (ValueError, TypeError):
return default
def _api_get(path: str, params: dict | None = None) -> list[dict]:
"""Make an OSSInsight API request and return the rows."""
url = f"{OSSINSIGHT_API}{path}"
try:
resp = _SESSION.get(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json().get("data", {})
return data.get("rows", [])
except (requests.RequestException, ValueError, KeyError) as e:
log.warning("OSSInsight API error for %s: %s", path, e)
return []
def _classify_domain(repo_name: str, description: str, collection_names: str = "") -> str | None:
"""Classify a repo into aiml, security, or None based on keywords."""
text = f"{repo_name} {description} {collection_names}"
if GITHUB_SECURITY_KEYWORDS.search(text):
return "security"
if GITHUB_AIML_KEYWORDS.search(text):
return "aiml"
return None
def fetch_trending_repos() -> list[dict]:
"""Fetch trending repos across configured languages for the past week."""
seen: set[str] = set()
projects: list[dict] = []
# Also fetch "All" to catch cross-language breakouts
languages = ["All"] + OSSINSIGHT_TRENDING_LANGUAGES
for lang in languages:
lang_param = lang if lang != "C++" else "C%2B%2B"
rows = _api_get("/trends/repos", {"language": lang_param, "period": "past_week"})
log.info("Trending %s: %d repos", lang, len(rows))
for row in rows:
repo_name = row.get("repo_name", "")
if not repo_name or repo_name in seen:
continue
seen.add(repo_name)
description = row.get("description", "") or ""
collection_names = row.get("collection_names", "") or ""
domain = _classify_domain(repo_name, description, collection_names)
if domain is None:
continue
projects.append({
"repo_id": _safe_int(row.get("repo_id")),
"repo_name": repo_name,
"description": description,
"language": row.get("primary_language", "") or "",
"stars": _safe_int(row.get("stars")),
"forks": _safe_int(row.get("forks")),
"pull_requests": _safe_int(row.get("pull_requests")),
"total_score": _safe_float(row.get("total_score")),
"collection_names": collection_names,
"topics": [],
"url": f"https://github.com/{repo_name}",
"domain": domain,
})
time.sleep(0.5)
return projects
def fetch_collection_rankings() -> list[dict]:
"""Fetch top repos from curated AI/ML and security collections."""
seen: set[str] = set()
projects: list[dict] = []
for cid, (cname, domain) in OSSINSIGHT_COLLECTIONS.items():
rows = _api_get(f"/collections/{cid}/ranking_by_stars", {"period": "past_28_days"})
log.info("Collection '%s' (%d): %d repos", cname, cid, len(rows))
for row in rows:
repo_name = row.get("repo_name", "")
if not repo_name or repo_name in seen:
continue
seen.add(repo_name)
growth = _safe_int(row.get("current_period_growth"))
if growth <= 0:
continue
projects.append({
"repo_id": _safe_int(row.get("repo_id")),
"repo_name": repo_name,
"description": "",
"language": "",
"stars": growth,
"forks": 0,
"pull_requests": 0,
"total_score": _safe_float(growth),
"collection_names": cname,
"topics": [],
"url": f"https://github.com/{repo_name}",
"domain": domain,
})
time.sleep(0.5)
return projects
def run_github_pipeline() -> int:
"""Run the full GitHub projects pipeline. Returns run_id."""
now = datetime.now(timezone.utc)
start = (now - timedelta(days=7)).date().isoformat()
end = now.date().isoformat()
run_id = create_run("github", start, end)
log.info("GitHub pipeline started — run %d (%s to %s)", run_id, start, end)
try:
# Strategy 1: Trending repos
trending = fetch_trending_repos()
log.info("Trending repos (filtered): %d", len(trending))
# Strategy 2: Collection rankings
collections = fetch_collection_rankings()
log.info("Collection repos: %d", len(collections))
# Merge — trending takes priority (has richer data)
seen = {p["repo_name"] for p in trending}
merged = list(trending)
for p in collections:
if p["repo_name"] not in seen:
seen.add(p["repo_name"])
merged.append(p)
log.info("Total unique projects: %d", len(merged))
if merged:
insert_github_projects(merged, run_id)
finish_run(run_id, len(merged))
log.info("GitHub pipeline complete — %d projects stored", len(merged))
return run_id
except Exception:
finish_run(run_id, 0, status="failed")
log.exception("GitHub pipeline failed")
raise