File size: 4,440 Bytes
d22875e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
RandomWeb β€” Certificate Transparency Log Worker
Connects to CertStream WebSocket to discover newly registered domains in real-time.
"""
import asyncio
import json
import logging
from urllib.parse import urlparse

import websockets

from backend.config import (
    CERTSTREAM_URL,
    CT_LOG_BATCH_SIZE,
    CT_LOG_RECONNECT_DELAY,
    CT_LOG_MAX_RECONNECT_DELAY,
    BLOCKED_TLDS,
)
from backend.workers.validator import enqueue_url
from backend.db import url_exists

logger = logging.getLogger("randomweb.ct_log")

# ─── Domain Filtering ───────────────────────────────────────
_seen_domains: set = set()
_MAX_SEEN_CACHE = 500_000


def _is_valid_domain(domain: str) -> bool:
    """Filter out invalid, wildcard, IP, and blocked TLD domains."""
    if not domain or len(domain) < 4:
        return False

    # Skip wildcards
    if domain.startswith("*."):
        domain = domain[2:]
    if "*" in domain:
        return False

    # Skip IP addresses
    parts = domain.split(".")
    if all(p.isdigit() for p in parts):
        return False

    # Skip blocked TLDs
    for tld in BLOCKED_TLDS:
        if domain.endswith(tld):
            return False

    # Must have at least one dot
    if "." not in domain:
        return False

    # Skip overly long domains (likely garbage)
    if len(domain) > 253:
        return False

    return True


def _deduplicate(domain: str) -> bool:
    """Returns True if the domain is new (not seen before)."""
    global _seen_domains
    if domain in _seen_domains:
        return False

    # Evict oldest entries if cache is full
    if len(_seen_domains) >= _MAX_SEEN_CACHE:
        # Remove half the cache (FIFO approximation)
        to_remove = list(_seen_domains)[:_MAX_SEEN_CACHE // 2]
        for d in to_remove:
            _seen_domains.discard(d)

    _seen_domains.add(domain)
    return True


async def _process_message(message: dict):
    """Process a single CertStream message and extract domains."""
    try:
        msg_type = message.get("message_type")
        if msg_type != "certificate_update":
            return

        data = message.get("data", {})
        leaf_cert = data.get("leaf_cert", {})
        all_domains = leaf_cert.get("all_domains", [])

        for domain in all_domains:
            # Strip wildcard prefix
            if domain.startswith("*."):
                domain = domain[2:]

            domain = domain.lower().strip()

            if not _is_valid_domain(domain):
                continue

            if not _deduplicate(domain):
                continue

            url = f"https://{domain}"
            await enqueue_url(url, source="ct_log")

    except Exception as e:
        logger.debug("Error processing CT message: %s", e)


async def run_ct_log_worker():
    """
    Main CT log worker loop. Connects to CertStream WebSocket,
    parses certificate updates, and queues new domains for validation.
    Auto-reconnects with exponential backoff.
    """
    logger.info("CT Log worker starting β€” connecting to %s", CERTSTREAM_URL)
    reconnect_delay = CT_LOG_RECONNECT_DELAY

    while True:
        try:
            async with websockets.connect(
                CERTSTREAM_URL,
                ping_interval=30,
                ping_timeout=10,
                close_timeout=5,
                max_size=2**20,  # 1MB max message size
            ) as ws:
                logger.info("Connected to CertStream")
                reconnect_delay = CT_LOG_RECONNECT_DELAY  # Reset on success

                async for raw_message in ws:
                    try:
                        message = json.loads(raw_message)
                        await _process_message(message)
                    except json.JSONDecodeError:
                        continue
                    except Exception as e:
                        logger.debug("Message processing error: %s", e)

        except websockets.exceptions.ConnectionClosed as e:
            logger.warning("CertStream connection closed: %s", e)
        except Exception as e:
            logger.warning("CertStream connection error: %s", e)

        # Exponential backoff reconnect
        logger.info("Reconnecting to CertStream in %ds...", reconnect_delay)
        await asyncio.sleep(reconnect_delay)
        reconnect_delay = min(reconnect_delay * 2, CT_LOG_MAX_RECONNECT_DELAY)