Spaces:
Paused
Paused
| """ | |
| RandomWeb β Database Helpers | |
| Supabase client initialization and common query functions. | |
| """ | |
| import logging | |
| from datetime import datetime, timedelta, timezone | |
| from urllib.parse import urlparse | |
| from typing import Optional | |
| from supabase import create_client, Client | |
| from backend.config import ( | |
| SUPABASE_URL, | |
| SUPABASE_SECRET_KEY, | |
| SUPABASE_PUBLISHABLE_KEY, | |
| RECHECK_INTERVAL_DAYS, | |
| ) | |
| logger = logging.getLogger("randomweb.db") | |
| # βββ Client Initialization ββββββββββββββββββββββββββββββββββ | |
| _client: Optional[Client] = None | |
| def get_client() -> Client: | |
| """Return a Supabase client using the secret key if available, else publishable.""" | |
| global _client | |
| if _client is None: | |
| # Priority: Secret Key (for writes) -> Publishable Key (fallback) | |
| key = SUPABASE_SECRET_KEY or SUPABASE_PUBLISHABLE_KEY | |
| if not key: | |
| logger.critical("β No Supabase API key found!") | |
| raise ValueError("SUPABASE_SECRET_KEY and SUPABASE_PUBLISHABLE_KEY are both empty.") | |
| _client = create_client(SUPABASE_URL, key) | |
| # Identify key type for debugging purposes | |
| key_type = "Managed (New)" if key.startswith("sb_") else "Legacy (JWT)" | |
| logger.info("β Supabase client initialized (Type: %s) for %s", key_type, SUPABASE_URL) | |
| return _client | |
| def extract_domain(url: str) -> str: | |
| """Extract the domain from a URL.""" | |
| parsed = urlparse(url) | |
| return parsed.netloc or parsed.path.split("/")[0] | |
| # βββ Insert / Upsert ββββββββββββββββββββββββββββββββββββββββ | |
| def upsert_website( | |
| url: str, | |
| source: str = "unknown", | |
| status: Optional[int] = None, | |
| is_active: bool = False, | |
| ) -> bool: | |
| """Insert or update a website record. Returns True on success.""" | |
| try: | |
| domain = extract_domain(url) | |
| now = datetime.now(timezone.utc).isoformat() | |
| next_check = ( | |
| (datetime.now(timezone.utc) + timedelta(days=RECHECK_INTERVAL_DAYS)).isoformat() | |
| if is_active | |
| else None | |
| ) | |
| data = { | |
| "url": url, | |
| "domain": domain, | |
| "source": source, | |
| "status": status, | |
| "is_active": is_active, | |
| "last_checked": now, | |
| "next_check": next_check, | |
| } | |
| get_client().table("websites").upsert( | |
| data, on_conflict="url" | |
| ).execute() | |
| return True | |
| except Exception as e: | |
| logger.error("Failed to upsert %s: %s", url, e) | |
| return False | |
| def bulk_upsert_websites(records: list[dict]) -> int: | |
| """Bulk upsert a list of website records. Returns count of successful inserts.""" | |
| if not records: | |
| return 0 | |
| try: | |
| get_client().table("websites").upsert( | |
| records, on_conflict="url" | |
| ).execute() | |
| return len(records) | |
| except Exception as e: | |
| logger.error("Bulk upsert failed (%d records): %s", len(records), e) | |
| return 0 | |
| # βββ Queries βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_random_active_url() -> Optional[str]: | |
| """Retrieve a random active website URL using the database function.""" | |
| try: | |
| result = get_client().rpc("get_random_active_website").execute() | |
| if result.data and len(result.data) > 0: | |
| return result.data[0]["url"] | |
| return None | |
| except Exception as e: | |
| logger.error("Failed to get random URL: %s", e) | |
| return None | |
| def search_websites(query: str, limit: int = 20) -> list[dict]: | |
| """Search websites by URL or domain using trigram similarity.""" | |
| try: | |
| result = ( | |
| get_client() | |
| .table("websites") | |
| .select("url, domain, is_active") | |
| .or_(f"url.ilike.%{query}%,domain.ilike.%{query}%") | |
| .eq("is_active", True) | |
| .limit(limit) | |
| .execute() | |
| ) | |
| return result.data or [] | |
| except Exception as e: | |
| logger.error("Search failed for '%s': %s", query, e) | |
| return [] | |
| def get_active_count() -> int: | |
| """Get the current active website count from stats.""" | |
| try: | |
| result = get_client().table("stats").select("active_count").eq("id", 1).execute() | |
| if result.data: | |
| return result.data[0]["active_count"] | |
| return 0 | |
| except Exception as e: | |
| logger.error("Failed to get active count: %s", e) | |
| return 0 | |
| def get_total_count() -> int: | |
| """Get total indexed websites from stats.""" | |
| try: | |
| result = get_client().table("stats").select("total_count").eq("id", 1).execute() | |
| if result.data: | |
| return result.data[0]["total_count"] | |
| return 0 | |
| except Exception as e: | |
| logger.error("Failed to get total count: %s", e) | |
| return 0 | |
| def url_exists(url: str) -> bool: | |
| """Check if a URL is already in the database.""" | |
| try: | |
| result = ( | |
| get_client() | |
| .table("websites") | |
| .select("id") | |
| .eq("url", url) | |
| .limit(1) | |
| .execute() | |
| ) | |
| return bool(result.data) | |
| except Exception as e: | |
| logger.error("Failed to check URL existence: %s", e) | |
| return False | |
| def get_urls_needing_recheck(limit: int = 100) -> list[dict]: | |
| """Get URLs that are due for re-verification.""" | |
| try: | |
| now = datetime.now(timezone.utc).isoformat() | |
| result = ( | |
| get_client() | |
| .table("websites") | |
| .select("id, url, domain") | |
| .eq("is_active", True) | |
| .lte("next_check", now) | |
| .limit(limit) | |
| .execute() | |
| ) | |
| return result.data or [] | |
| except Exception as e: | |
| logger.error("Failed to get recheck URLs: %s", e) | |
| return [] | |