RandomWeb / backend /workers /ct_log.py
Guest1
πŸš€ Initial Clean Deployment
d22875e
"""
RandomWeb β€” Certificate Transparency Log Worker
Connects to CertStream WebSocket to discover newly registered domains in real-time.
"""
import asyncio
import json
import logging
from urllib.parse import urlparse
import websockets
from backend.config import (
CERTSTREAM_URL,
CT_LOG_BATCH_SIZE,
CT_LOG_RECONNECT_DELAY,
CT_LOG_MAX_RECONNECT_DELAY,
BLOCKED_TLDS,
)
from backend.workers.validator import enqueue_url
from backend.db import url_exists
logger = logging.getLogger("randomweb.ct_log")
# ─── Domain Filtering ───────────────────────────────────────
_seen_domains: set = set()
_MAX_SEEN_CACHE = 500_000
def _is_valid_domain(domain: str) -> bool:
"""Filter out invalid, wildcard, IP, and blocked TLD domains."""
if not domain or len(domain) < 4:
return False
# Skip wildcards
if domain.startswith("*."):
domain = domain[2:]
if "*" in domain:
return False
# Skip IP addresses
parts = domain.split(".")
if all(p.isdigit() for p in parts):
return False
# Skip blocked TLDs
for tld in BLOCKED_TLDS:
if domain.endswith(tld):
return False
# Must have at least one dot
if "." not in domain:
return False
# Skip overly long domains (likely garbage)
if len(domain) > 253:
return False
return True
def _deduplicate(domain: str) -> bool:
"""Returns True if the domain is new (not seen before)."""
global _seen_domains
if domain in _seen_domains:
return False
# Evict oldest entries if cache is full
if len(_seen_domains) >= _MAX_SEEN_CACHE:
# Remove half the cache (FIFO approximation)
to_remove = list(_seen_domains)[:_MAX_SEEN_CACHE // 2]
for d in to_remove:
_seen_domains.discard(d)
_seen_domains.add(domain)
return True
async def _process_message(message: dict):
"""Process a single CertStream message and extract domains."""
try:
msg_type = message.get("message_type")
if msg_type != "certificate_update":
return
data = message.get("data", {})
leaf_cert = data.get("leaf_cert", {})
all_domains = leaf_cert.get("all_domains", [])
for domain in all_domains:
# Strip wildcard prefix
if domain.startswith("*."):
domain = domain[2:]
domain = domain.lower().strip()
if not _is_valid_domain(domain):
continue
if not _deduplicate(domain):
continue
url = f"https://{domain}"
await enqueue_url(url, source="ct_log")
except Exception as e:
logger.debug("Error processing CT message: %s", e)
async def run_ct_log_worker():
"""
Main CT log worker loop. Connects to CertStream WebSocket,
parses certificate updates, and queues new domains for validation.
Auto-reconnects with exponential backoff.
"""
logger.info("CT Log worker starting β€” connecting to %s", CERTSTREAM_URL)
reconnect_delay = CT_LOG_RECONNECT_DELAY
while True:
try:
async with websockets.connect(
CERTSTREAM_URL,
ping_interval=30,
ping_timeout=10,
close_timeout=5,
max_size=2**20, # 1MB max message size
) as ws:
logger.info("Connected to CertStream")
reconnect_delay = CT_LOG_RECONNECT_DELAY # Reset on success
async for raw_message in ws:
try:
message = json.loads(raw_message)
await _process_message(message)
except json.JSONDecodeError:
continue
except Exception as e:
logger.debug("Message processing error: %s", e)
except websockets.exceptions.ConnectionClosed as e:
logger.warning("CertStream connection closed: %s", e)
except Exception as e:
logger.warning("CertStream connection error: %s", e)
# Exponential backoff reconnect
logger.info("Reconnecting to CertStream in %ds...", reconnect_delay)
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, CT_LOG_MAX_RECONNECT_DELAY)