"""GitHub projects pipeline — discover trending repos via OSSInsight.io API. Two strategies: 1. Trending repos — weekly trending filtered by AI/ML and security keywords 2. Collection rankings — curated collections ranked by star growth """ import logging import time from datetime import datetime, timedelta, timezone import requests from src.config import ( GITHUB_AIML_KEYWORDS, GITHUB_SECURITY_KEYWORDS, OSSINSIGHT_API, OSSINSIGHT_COLLECTIONS, OSSINSIGHT_TRENDING_LANGUAGES, ) from src.db import create_run, finish_run, insert_github_projects log = logging.getLogger(__name__) _SESSION = requests.Session() _SESSION.headers["Accept"] = "application/json" def _safe_int(val, default=0) -> int: """Parse an int from a value that may be empty string or None.""" if not val and val != 0: return default try: return int(val) except (ValueError, TypeError): return default def _safe_float(val, default=0.0) -> float: if not val and val != 0: return default try: return float(val) except (ValueError, TypeError): return default def _api_get(path: str, params: dict | None = None) -> list[dict]: """Make an OSSInsight API request and return the rows.""" url = f"{OSSINSIGHT_API}{path}" try: resp = _SESSION.get(url, params=params, timeout=30) resp.raise_for_status() data = resp.json().get("data", {}) return data.get("rows", []) except (requests.RequestException, ValueError, KeyError) as e: log.warning("OSSInsight API error for %s: %s", path, e) return [] def _classify_domain(repo_name: str, description: str, collection_names: str = "") -> str | None: """Classify a repo into aiml, security, or None based on keywords.""" text = f"{repo_name} {description} {collection_names}" if GITHUB_SECURITY_KEYWORDS.search(text): return "security" if GITHUB_AIML_KEYWORDS.search(text): return "aiml" return None def fetch_trending_repos() -> list[dict]: """Fetch trending repos across configured languages for the past week.""" seen: set[str] = set() projects: list[dict] = [] # Also fetch "All" to catch cross-language breakouts languages = ["All"] + OSSINSIGHT_TRENDING_LANGUAGES for lang in languages: lang_param = lang if lang != "C++" else "C%2B%2B" rows = _api_get("/trends/repos", {"language": lang_param, "period": "past_week"}) log.info("Trending %s: %d repos", lang, len(rows)) for row in rows: repo_name = row.get("repo_name", "") if not repo_name or repo_name in seen: continue seen.add(repo_name) description = row.get("description", "") or "" collection_names = row.get("collection_names", "") or "" domain = _classify_domain(repo_name, description, collection_names) if domain is None: continue projects.append({ "repo_id": _safe_int(row.get("repo_id")), "repo_name": repo_name, "description": description, "language": row.get("primary_language", "") or "", "stars": _safe_int(row.get("stars")), "forks": _safe_int(row.get("forks")), "pull_requests": _safe_int(row.get("pull_requests")), "total_score": _safe_float(row.get("total_score")), "collection_names": collection_names, "topics": [], "url": f"https://github.com/{repo_name}", "domain": domain, }) time.sleep(0.5) return projects def fetch_collection_rankings() -> list[dict]: """Fetch top repos from curated AI/ML and security collections.""" seen: set[str] = set() projects: list[dict] = [] for cid, (cname, domain) in OSSINSIGHT_COLLECTIONS.items(): rows = _api_get(f"/collections/{cid}/ranking_by_stars", {"period": "past_28_days"}) log.info("Collection '%s' (%d): %d repos", cname, cid, len(rows)) for row in rows: repo_name = row.get("repo_name", "") if not repo_name or repo_name in seen: continue seen.add(repo_name) growth = _safe_int(row.get("current_period_growth")) if growth <= 0: continue projects.append({ "repo_id": _safe_int(row.get("repo_id")), "repo_name": repo_name, "description": "", "language": "", "stars": growth, "forks": 0, "pull_requests": 0, "total_score": _safe_float(growth), "collection_names": cname, "topics": [], "url": f"https://github.com/{repo_name}", "domain": domain, }) time.sleep(0.5) return projects def run_github_pipeline() -> int: """Run the full GitHub projects pipeline. Returns run_id.""" now = datetime.now(timezone.utc) start = (now - timedelta(days=7)).date().isoformat() end = now.date().isoformat() run_id = create_run("github", start, end) log.info("GitHub pipeline started — run %d (%s to %s)", run_id, start, end) try: # Strategy 1: Trending repos trending = fetch_trending_repos() log.info("Trending repos (filtered): %d", len(trending)) # Strategy 2: Collection rankings collections = fetch_collection_rankings() log.info("Collection repos: %d", len(collections)) # Merge — trending takes priority (has richer data) seen = {p["repo_name"] for p in trending} merged = list(trending) for p in collections: if p["repo_name"] not in seen: seen.add(p["repo_name"]) merged.append(p) log.info("Total unique projects: %d", len(merged)) if merged: insert_github_projects(merged, run_id) finish_run(run_id, len(merged)) log.info("GitHub pipeline complete — %d projects stored", len(merged)) return run_id except Exception: finish_run(run_id, 0, status="failed") log.exception("GitHub pipeline failed") raise