Spaces:
Paused
Paused
File size: 6,028 Bytes
d22875e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | """
RandomWeb β Database Helpers
Supabase client initialization and common query functions.
"""
import logging
from datetime import datetime, timedelta, timezone
from urllib.parse import urlparse
from typing import Optional
from supabase import create_client, Client
from backend.config import (
SUPABASE_URL,
SUPABASE_SECRET_KEY,
SUPABASE_PUBLISHABLE_KEY,
RECHECK_INTERVAL_DAYS,
)
logger = logging.getLogger("randomweb.db")
# βββ Client Initialization ββββββββββββββββββββββββββββββββββ
_client: Optional[Client] = None
def get_client() -> Client:
"""Return a Supabase client using the secret key if available, else publishable."""
global _client
if _client is None:
# Priority: Secret Key (for writes) -> Publishable Key (fallback)
key = SUPABASE_SECRET_KEY or SUPABASE_PUBLISHABLE_KEY
if not key:
logger.critical("β No Supabase API key found!")
raise ValueError("SUPABASE_SECRET_KEY and SUPABASE_PUBLISHABLE_KEY are both empty.")
_client = create_client(SUPABASE_URL, key)
# Identify key type for debugging purposes
key_type = "Managed (New)" if key.startswith("sb_") else "Legacy (JWT)"
logger.info("β
Supabase client initialized (Type: %s) for %s", key_type, SUPABASE_URL)
return _client
def extract_domain(url: str) -> str:
"""Extract the domain from a URL."""
parsed = urlparse(url)
return parsed.netloc or parsed.path.split("/")[0]
# βββ Insert / Upsert ββββββββββββββββββββββββββββββββββββββββ
def upsert_website(
url: str,
source: str = "unknown",
status: Optional[int] = None,
is_active: bool = False,
) -> bool:
"""Insert or update a website record. Returns True on success."""
try:
domain = extract_domain(url)
now = datetime.now(timezone.utc).isoformat()
next_check = (
(datetime.now(timezone.utc) + timedelta(days=RECHECK_INTERVAL_DAYS)).isoformat()
if is_active
else None
)
data = {
"url": url,
"domain": domain,
"source": source,
"status": status,
"is_active": is_active,
"last_checked": now,
"next_check": next_check,
}
get_client().table("websites").upsert(
data, on_conflict="url"
).execute()
return True
except Exception as e:
logger.error("Failed to upsert %s: %s", url, e)
return False
def bulk_upsert_websites(records: list[dict]) -> int:
"""Bulk upsert a list of website records. Returns count of successful inserts."""
if not records:
return 0
try:
get_client().table("websites").upsert(
records, on_conflict="url"
).execute()
return len(records)
except Exception as e:
logger.error("Bulk upsert failed (%d records): %s", len(records), e)
return 0
# βββ Queries βββββββββββββββββββββββββββββββββββββββββββββββββ
def get_random_active_url() -> Optional[str]:
"""Retrieve a random active website URL using the database function."""
try:
result = get_client().rpc("get_random_active_website").execute()
if result.data and len(result.data) > 0:
return result.data[0]["url"]
return None
except Exception as e:
logger.error("Failed to get random URL: %s", e)
return None
def search_websites(query: str, limit: int = 20) -> list[dict]:
"""Search websites by URL or domain using trigram similarity."""
try:
result = (
get_client()
.table("websites")
.select("url, domain, is_active")
.or_(f"url.ilike.%{query}%,domain.ilike.%{query}%")
.eq("is_active", True)
.limit(limit)
.execute()
)
return result.data or []
except Exception as e:
logger.error("Search failed for '%s': %s", query, e)
return []
def get_active_count() -> int:
"""Get the current active website count from stats."""
try:
result = get_client().table("stats").select("active_count").eq("id", 1).execute()
if result.data:
return result.data[0]["active_count"]
return 0
except Exception as e:
logger.error("Failed to get active count: %s", e)
return 0
def get_total_count() -> int:
"""Get total indexed websites from stats."""
try:
result = get_client().table("stats").select("total_count").eq("id", 1).execute()
if result.data:
return result.data[0]["total_count"]
return 0
except Exception as e:
logger.error("Failed to get total count: %s", e)
return 0
def url_exists(url: str) -> bool:
"""Check if a URL is already in the database."""
try:
result = (
get_client()
.table("websites")
.select("id")
.eq("url", url)
.limit(1)
.execute()
)
return bool(result.data)
except Exception as e:
logger.error("Failed to check URL existence: %s", e)
return False
def get_urls_needing_recheck(limit: int = 100) -> list[dict]:
"""Get URLs that are due for re-verification."""
try:
now = datetime.now(timezone.utc).isoformat()
result = (
get_client()
.table("websites")
.select("id, url, domain")
.eq("is_active", True)
.lte("next_check", now)
.limit(limit)
.execute()
)
return result.data or []
except Exception as e:
logger.error("Failed to get recheck URLs: %s", e)
return []
|