| |
| """ |
| HaveIBeenPwned Unified Search API Script |
| Uses cloudscraper to bypass Cloudflare protection and serves results via a local HTTP API. |
| """ |
|
|
| import json |
| import sys |
| import urllib.parse |
| from datetime import datetime |
|
|
| import cloudscraper |
| from flask import Flask, jsonify, request |
|
|
| |
| |
| |
|
|
| HIBP_BASE_URL = "https://haveibeenpwned.com" |
|
|
| DEFAULT_HEADERS = { |
| "Accept": "*/*", |
| "Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7", |
| "DNT": "1", |
| "Referer": "https://haveibeenpwned.com/", |
| "Sec-Fetch-Dest": "empty", |
| "Sec-Fetch-Mode": "cors", |
| "Sec-Fetch-Site": "same-origin", |
| "Sec-GPC": "1", |
| "TE": "trailers", |
| "Priority": "u=0", |
| "User-Agent": ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) " |
| "Gecko/20100101 Firefox/148.0" |
| ), |
| } |
|
|
|
|
| |
| |
| |
|
|
| class HIBPClient: |
| def __init__(self): |
| self.scraper = cloudscraper.create_scraper( |
| browser={ |
| "browser": "firefox", |
| "platform": "windows", |
| "desktop": True, |
| }, |
| delay=5, |
| ) |
| self.scraper.headers.update(DEFAULT_HEADERS) |
|
|
| def search_email(self, email: str) -> dict: |
| encoded_email = urllib.parse.quote(email, safe="") |
| url = f"{HIBP_BASE_URL}/unifiedsearch/{encoded_email}" |
|
|
| result = { |
| "email": email, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "status_code": None, |
| "data": None, |
| "error": None, |
| } |
|
|
| try: |
| response = self.scraper.get(url, timeout=30) |
| result["status_code"] = response.status_code |
|
|
| if response.status_code == 200: |
| result["data"] = response.json() |
| elif response.status_code == 404: |
| result["data"] = {"Breaches": None, "Pastes": None} |
| result["error"] = "Email not found in any known breach." |
| elif response.status_code == 429: |
| retry_after = response.headers.get("Retry-After", "unknown") |
| result["error"] = f"Rate limited. Retry after {retry_after} seconds." |
| elif response.status_code == 403: |
| result["error"] = "Blocked by Cloudflare or access denied." |
| else: |
| result["error"] = f"Unexpected status code: {response.status_code}" |
|
|
| except cloudscraper.exceptions.CloudflareChallengeError as e: |
| result["error"] = f"Cloudflare challenge failed: {str(e)}" |
| result["status_code"] = 503 |
| except Exception as e: |
| result["error"] = f"Request failed: {str(e)}" |
| result["status_code"] = 500 |
|
|
| return result |
|
|
| def parse_breaches(self, data: dict) -> list: |
| breaches = data.get("Breaches") or [] |
| return [ |
| { |
| "name": b.get("Name"), |
| "title": b.get("Title"), |
| "domain": b.get("Domain"), |
| "breach_date": b.get("BreachDate"), |
| "added_date": b.get("AddedDate"), |
| "pwn_count": b.get("PwnCount"), |
| "data_classes": b.get("DataClasses", []), |
| "description": b.get("Description"), |
| "logo": b.get("LogoPath"), |
| "is_verified": b.get("IsVerified"), |
| "is_sensitive": b.get("IsSensitive"), |
| "is_stealer_log": b.get("IsStealerLog"), |
| "is_malware": b.get("IsMalware"), |
| } |
| for b in breaches |
| ] |
|
|
|
|
| |
| |
| |
|
|
| app = Flask(__name__) |
| hibp_client = HIBPClient() |
|
|
|
|
| @app.route("/", methods=["GET"]) |
| def index(): |
| return jsonify({ |
| "service": "HaveIBeenPwned Proxy API", |
| "version": "1.0.0", |
| "endpoints": { |
| "GET /": "Documentation", |
| "GET /search/<email>": "Raw HIBP search", |
| "GET /breaches/<email>": "Parsed breach summaries", |
| "GET /health": "Health check", |
| }, |
| }) |
|
|
|
|
| @app.route("/health", methods=["GET"]) |
| def health(): |
| return jsonify({ |
| "status": "ok", |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| }) |
|
|
|
|
| @app.route("/search/<path:email>", methods=["GET"]) |
| def search_email(email: str): |
| if not email or "@" not in email: |
| return jsonify({"error": "Invalid email address.", "email": email}), 400 |
|
|
| result = hibp_client.search_email(email) |
|
|
| if result["status_code"] in (200, 404): |
| http_status = 200 |
| else: |
| http_status = result["status_code"] or 500 |
|
|
| return jsonify(result), http_status |
|
|
|
|
| @app.route("/breaches/<path:email>", methods=["GET"]) |
| def get_breaches(email: str): |
| if not email or "@" not in email: |
| return jsonify({"error": "Invalid email address.", "email": email}), 400 |
|
|
| result = hibp_client.search_email(email) |
|
|
| if result["data"]: |
| breaches = hibp_client.parse_breaches(result["data"]) |
| pastes = result["data"].get("Pastes") or [] |
| return jsonify({ |
| "email": email, |
| "timestamp": result["timestamp"], |
| "total_breaches": len(breaches), |
| "total_pastes": len(pastes), |
| "breaches": breaches, |
| "pastes": pastes, |
| "is_pwned": len(breaches) > 0 or len(pastes) > 0, |
| }) |
| else: |
| return jsonify({ |
| "email": email, |
| "timestamp": result["timestamp"], |
| "error": result["error"], |
| "is_pwned": None, |
| }), result["status_code"] or 500 |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import os |
| port = int(os.environ.get("PORT", 5000)) |
| host = os.environ.get("HOST", "0.0.0.0") |
| app.run(host=host, port=port, debug=False) |