Spaces:
Paused
Paused
File size: 8,094 Bytes
d22875e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 | """
RandomWeb β Polite Async HTTP Validator
Validates discovered URLs with rate limiting, robots.txt compliance,
clear user-agent identification, and timeout rules.
"""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
from urllib.parse import urlparse
import aiohttp
from aiolimiter import AsyncLimiter
from protego import Protego
from backend.config import (
USER_AGENT,
REQUEST_TIMEOUT,
VALIDATION_CONCURRENCY,
PER_DOMAIN_RATE_LIMIT,
CRAWL_DELAY_DEFAULT,
RECHECK_INTERVAL_DAYS,
)
from backend.db import get_client, extract_domain
logger = logging.getLogger("randomweb.validator")
# βββ Shared State ββββββββββββββββββββββββββββββββββββββββββββ
_validation_queue: asyncio.Queue = asyncio.Queue(maxsize=50_000)
_robots_cache: dict[str, Optional[Protego]] = {}
_domain_limiters: dict[str, AsyncLimiter] = {}
_semaphore: Optional[asyncio.Semaphore] = None
def get_validation_queue() -> asyncio.Queue:
return _validation_queue
async def enqueue_url(url: str, source: str = "unknown"):
"""Add a URL to the validation queue."""
try:
_validation_queue.put_nowait({"url": url, "source": source})
except asyncio.QueueFull:
logger.warning("Validation queue full, dropping: %s", url)
def _get_domain_limiter(domain: str) -> AsyncLimiter:
"""Get or create a per-domain rate limiter."""
if domain not in _domain_limiters:
_domain_limiters[domain] = AsyncLimiter(
PER_DOMAIN_RATE_LIMIT, 1.0
)
return _domain_limiters[domain]
async def _fetch_robots_txt(
session: aiohttp.ClientSession, domain: str
) -> Optional[Protego]:
"""Fetch and parse robots.txt for a domain. Cached."""
if domain in _robots_cache:
return _robots_cache[domain]
robots_url = f"https://{domain}/robots.txt"
try:
async with session.get(
robots_url,
timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT),
headers={"User-Agent": USER_AGENT},
allow_redirects=True,
ssl=False,
) as resp:
if resp.status == 200:
text = await resp.text()
parser = Protego.parse(text)
_robots_cache[domain] = parser
return parser
except Exception:
pass
_robots_cache[domain] = None
return None
async def _can_fetch(
session: aiohttp.ClientSession, url: str
) -> tuple[bool, float]:
"""
Check if we're allowed to fetch a URL per robots.txt.
Returns (allowed, crawl_delay).
"""
domain = extract_domain(url)
robots = await _fetch_robots_txt(session, domain)
if robots is None:
return True, CRAWL_DELAY_DEFAULT
allowed = robots.can_fetch(url, USER_AGENT)
delay = robots.crawl_delay(USER_AGENT)
if delay is None:
delay = CRAWL_DELAY_DEFAULT
return allowed, delay
async def validate_url(
session: aiohttp.ClientSession,
url: str,
source: str = "unknown",
) -> Optional[dict]:
"""
Validate a single URL. Returns a record dict if successful, else None.
Steps:
1. Check robots.txt
2. Send HEAD request (fallback to GET)
3. Return result with status
"""
domain = extract_domain(url)
limiter = _get_domain_limiter(domain)
# Rate limit per domain
async with limiter:
# Check robots.txt
allowed, delay = await _can_fetch(session, url)
if not allowed:
logger.debug("Blocked by robots.txt: %s", url)
return None
# Respect crawl delay
if delay > 0:
await asyncio.sleep(delay)
now = datetime.now(timezone.utc).isoformat()
status_code = None
try:
# Try HEAD first (lighter)
async with session.head(
url,
timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT),
headers={"User-Agent": USER_AGENT},
allow_redirects=True,
ssl=False,
) as resp:
status_code = resp.status
except Exception:
try:
# Fallback to GET
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT),
headers={"User-Agent": USER_AGENT},
allow_redirects=True,
ssl=False,
) as resp:
status_code = resp.status
except Exception as e:
logger.debug("Validation failed for %s: %s", url, e)
status_code = None
is_active = status_code == 200
next_check = (
(datetime.now(timezone.utc) + timedelta(days=RECHECK_INTERVAL_DAYS)).isoformat()
if is_active
else None
)
record = {
"url": url,
"domain": domain,
"source": source,
"status": status_code,
"is_active": is_active,
"last_checked": now,
"next_check": next_check,
}
return record
async def _process_batch(
session: aiohttp.ClientSession,
batch: list[dict],
) -> list[dict]:
"""Validate a batch of URLs concurrently."""
tasks = [
validate_url(session, item["url"], item.get("source", "unknown"))
for item in batch
]
results = await asyncio.gather(*tasks, return_exceptions=True)
records = []
for result in results:
if isinstance(result, dict) and result is not None:
records.append(result)
elif isinstance(result, Exception):
logger.error("Validation task error: %s", result)
return records
async def run_validator():
"""
Main validation loop. Continuously drains the validation queue,
validates URLs in batches, and upserts results to Supabase.
"""
global _semaphore
_semaphore = asyncio.Semaphore(VALIDATION_CONCURRENCY)
logger.info("Validation worker started")
connector = aiohttp.TCPConnector(
limit=VALIDATION_CONCURRENCY,
ttl_dns_cache=300,
force_close=False,
)
async with aiohttp.ClientSession(connector=connector) as session:
while True:
try:
# Collect a batch
batch = []
try:
# Wait for at least one item
item = await asyncio.wait_for(
_validation_queue.get(), timeout=5.0
)
batch.append(item)
except asyncio.TimeoutError:
await asyncio.sleep(1)
continue
# Drain up to batch size
while len(batch) < 50 and not _validation_queue.empty():
try:
batch.append(_validation_queue.get_nowait())
except asyncio.QueueEmpty:
break
if batch:
logger.info("Validating batch of %d URLs", len(batch))
records = await _process_batch(session, batch)
if records:
# Bulk upsert to Supabase
try:
get_client().table("websites").upsert(
records, on_conflict="url"
).execute()
active = sum(1 for r in records if r["is_active"])
logger.info(
"Upserted %d records (%d active)",
len(records), active,
)
except Exception as e:
logger.error("Bulk upsert failed: %s", e)
except Exception as e:
logger.error("Validator loop error: %s", e)
await asyncio.sleep(5)
|