Spaces:
Paused
Paused
File size: 4,440 Bytes
d22875e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | """
RandomWeb β Certificate Transparency Log Worker
Connects to CertStream WebSocket to discover newly registered domains in real-time.
"""
import asyncio
import json
import logging
from urllib.parse import urlparse
import websockets
from backend.config import (
CERTSTREAM_URL,
CT_LOG_BATCH_SIZE,
CT_LOG_RECONNECT_DELAY,
CT_LOG_MAX_RECONNECT_DELAY,
BLOCKED_TLDS,
)
from backend.workers.validator import enqueue_url
from backend.db import url_exists
logger = logging.getLogger("randomweb.ct_log")
# βββ Domain Filtering βββββββββββββββββββββββββββββββββββββββ
_seen_domains: set = set()
_MAX_SEEN_CACHE = 500_000
def _is_valid_domain(domain: str) -> bool:
"""Filter out invalid, wildcard, IP, and blocked TLD domains."""
if not domain or len(domain) < 4:
return False
# Skip wildcards
if domain.startswith("*."):
domain = domain[2:]
if "*" in domain:
return False
# Skip IP addresses
parts = domain.split(".")
if all(p.isdigit() for p in parts):
return False
# Skip blocked TLDs
for tld in BLOCKED_TLDS:
if domain.endswith(tld):
return False
# Must have at least one dot
if "." not in domain:
return False
# Skip overly long domains (likely garbage)
if len(domain) > 253:
return False
return True
def _deduplicate(domain: str) -> bool:
"""Returns True if the domain is new (not seen before)."""
global _seen_domains
if domain in _seen_domains:
return False
# Evict oldest entries if cache is full
if len(_seen_domains) >= _MAX_SEEN_CACHE:
# Remove half the cache (FIFO approximation)
to_remove = list(_seen_domains)[:_MAX_SEEN_CACHE // 2]
for d in to_remove:
_seen_domains.discard(d)
_seen_domains.add(domain)
return True
async def _process_message(message: dict):
"""Process a single CertStream message and extract domains."""
try:
msg_type = message.get("message_type")
if msg_type != "certificate_update":
return
data = message.get("data", {})
leaf_cert = data.get("leaf_cert", {})
all_domains = leaf_cert.get("all_domains", [])
for domain in all_domains:
# Strip wildcard prefix
if domain.startswith("*."):
domain = domain[2:]
domain = domain.lower().strip()
if not _is_valid_domain(domain):
continue
if not _deduplicate(domain):
continue
url = f"https://{domain}"
await enqueue_url(url, source="ct_log")
except Exception as e:
logger.debug("Error processing CT message: %s", e)
async def run_ct_log_worker():
"""
Main CT log worker loop. Connects to CertStream WebSocket,
parses certificate updates, and queues new domains for validation.
Auto-reconnects with exponential backoff.
"""
logger.info("CT Log worker starting β connecting to %s", CERTSTREAM_URL)
reconnect_delay = CT_LOG_RECONNECT_DELAY
while True:
try:
async with websockets.connect(
CERTSTREAM_URL,
ping_interval=30,
ping_timeout=10,
close_timeout=5,
max_size=2**20, # 1MB max message size
) as ws:
logger.info("Connected to CertStream")
reconnect_delay = CT_LOG_RECONNECT_DELAY # Reset on success
async for raw_message in ws:
try:
message = json.loads(raw_message)
await _process_message(message)
except json.JSONDecodeError:
continue
except Exception as e:
logger.debug("Message processing error: %s", e)
except websockets.exceptions.ConnectionClosed as e:
logger.warning("CertStream connection closed: %s", e)
except Exception as e:
logger.warning("CertStream connection error: %s", e)
# Exponential backoff reconnect
logger.info("Reconnecting to CertStream in %ds...", reconnect_delay)
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, CT_LOG_MAX_RECONNECT_DELAY)
|