Spaces:
Paused
Paused
| """ | |
| RandomWeb β Certificate Transparency Log Worker | |
| Connects to CertStream WebSocket to discover newly registered domains in real-time. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| from urllib.parse import urlparse | |
| import websockets | |
| from backend.config import ( | |
| CERTSTREAM_URL, | |
| CT_LOG_BATCH_SIZE, | |
| CT_LOG_RECONNECT_DELAY, | |
| CT_LOG_MAX_RECONNECT_DELAY, | |
| BLOCKED_TLDS, | |
| ) | |
| from backend.workers.validator import enqueue_url | |
| from backend.db import url_exists | |
| logger = logging.getLogger("randomweb.ct_log") | |
| # βββ Domain Filtering βββββββββββββββββββββββββββββββββββββββ | |
| _seen_domains: set = set() | |
| _MAX_SEEN_CACHE = 500_000 | |
| def _is_valid_domain(domain: str) -> bool: | |
| """Filter out invalid, wildcard, IP, and blocked TLD domains.""" | |
| if not domain or len(domain) < 4: | |
| return False | |
| # Skip wildcards | |
| if domain.startswith("*."): | |
| domain = domain[2:] | |
| if "*" in domain: | |
| return False | |
| # Skip IP addresses | |
| parts = domain.split(".") | |
| if all(p.isdigit() for p in parts): | |
| return False | |
| # Skip blocked TLDs | |
| for tld in BLOCKED_TLDS: | |
| if domain.endswith(tld): | |
| return False | |
| # Must have at least one dot | |
| if "." not in domain: | |
| return False | |
| # Skip overly long domains (likely garbage) | |
| if len(domain) > 253: | |
| return False | |
| return True | |
| def _deduplicate(domain: str) -> bool: | |
| """Returns True if the domain is new (not seen before).""" | |
| global _seen_domains | |
| if domain in _seen_domains: | |
| return False | |
| # Evict oldest entries if cache is full | |
| if len(_seen_domains) >= _MAX_SEEN_CACHE: | |
| # Remove half the cache (FIFO approximation) | |
| to_remove = list(_seen_domains)[:_MAX_SEEN_CACHE // 2] | |
| for d in to_remove: | |
| _seen_domains.discard(d) | |
| _seen_domains.add(domain) | |
| return True | |
| async def _process_message(message: dict): | |
| """Process a single CertStream message and extract domains.""" | |
| try: | |
| msg_type = message.get("message_type") | |
| if msg_type != "certificate_update": | |
| return | |
| data = message.get("data", {}) | |
| leaf_cert = data.get("leaf_cert", {}) | |
| all_domains = leaf_cert.get("all_domains", []) | |
| for domain in all_domains: | |
| # Strip wildcard prefix | |
| if domain.startswith("*."): | |
| domain = domain[2:] | |
| domain = domain.lower().strip() | |
| if not _is_valid_domain(domain): | |
| continue | |
| if not _deduplicate(domain): | |
| continue | |
| url = f"https://{domain}" | |
| await enqueue_url(url, source="ct_log") | |
| except Exception as e: | |
| logger.debug("Error processing CT message: %s", e) | |
| async def run_ct_log_worker(): | |
| """ | |
| Main CT log worker loop. Connects to CertStream WebSocket, | |
| parses certificate updates, and queues new domains for validation. | |
| Auto-reconnects with exponential backoff. | |
| """ | |
| logger.info("CT Log worker starting β connecting to %s", CERTSTREAM_URL) | |
| reconnect_delay = CT_LOG_RECONNECT_DELAY | |
| while True: | |
| try: | |
| async with websockets.connect( | |
| CERTSTREAM_URL, | |
| ping_interval=30, | |
| ping_timeout=10, | |
| close_timeout=5, | |
| max_size=2**20, # 1MB max message size | |
| ) as ws: | |
| logger.info("Connected to CertStream") | |
| reconnect_delay = CT_LOG_RECONNECT_DELAY # Reset on success | |
| async for raw_message in ws: | |
| try: | |
| message = json.loads(raw_message) | |
| await _process_message(message) | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| logger.debug("Message processing error: %s", e) | |
| except websockets.exceptions.ConnectionClosed as e: | |
| logger.warning("CertStream connection closed: %s", e) | |
| except Exception as e: | |
| logger.warning("CertStream connection error: %s", e) | |
| # Exponential backoff reconnect | |
| logger.info("Reconnecting to CertStream in %ds...", reconnect_delay) | |
| await asyncio.sleep(reconnect_delay) | |
| reconnect_delay = min(reconnect_delay * 2, CT_LOG_MAX_RECONNECT_DELAY) | |