HIBP / hibp_api.py
MB-IDK's picture
Create hibp_api.py
e3df6f2 verified
#!/usr/bin/env python3
"""
HaveIBeenPwned Unified Search API Script
Uses cloudscraper to bypass Cloudflare protection and serves results via a local HTTP API.
"""
import json
import sys
import urllib.parse
from datetime import datetime
import cloudscraper
from flask import Flask, jsonify, request
# =============================================================================
# CONFIGURATION
# =============================================================================
HIBP_BASE_URL = "https://haveibeenpwned.com"
DEFAULT_HEADERS = {
"Accept": "*/*",
"Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7",
"DNT": "1",
"Referer": "https://haveibeenpwned.com/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"TE": "trailers",
"Priority": "u=0",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
}
# =============================================================================
# CLOUDSCRAPER CLIENT
# =============================================================================
class HIBPClient:
def __init__(self):
self.scraper = cloudscraper.create_scraper(
browser={
"browser": "firefox",
"platform": "windows",
"desktop": True,
},
delay=5,
)
self.scraper.headers.update(DEFAULT_HEADERS)
def search_email(self, email: str) -> dict:
encoded_email = urllib.parse.quote(email, safe="")
url = f"{HIBP_BASE_URL}/unifiedsearch/{encoded_email}"
result = {
"email": email,
"timestamp": datetime.utcnow().isoformat() + "Z",
"status_code": None,
"data": None,
"error": None,
}
try:
response = self.scraper.get(url, timeout=30)
result["status_code"] = response.status_code
if response.status_code == 200:
result["data"] = response.json()
elif response.status_code == 404:
result["data"] = {"Breaches": None, "Pastes": None}
result["error"] = "Email not found in any known breach."
elif response.status_code == 429:
retry_after = response.headers.get("Retry-After", "unknown")
result["error"] = f"Rate limited. Retry after {retry_after} seconds."
elif response.status_code == 403:
result["error"] = "Blocked by Cloudflare or access denied."
else:
result["error"] = f"Unexpected status code: {response.status_code}"
except cloudscraper.exceptions.CloudflareChallengeError as e:
result["error"] = f"Cloudflare challenge failed: {str(e)}"
result["status_code"] = 503
except Exception as e:
result["error"] = f"Request failed: {str(e)}"
result["status_code"] = 500
return result
def parse_breaches(self, data: dict) -> list:
breaches = data.get("Breaches") or []
return [
{
"name": b.get("Name"),
"title": b.get("Title"),
"domain": b.get("Domain"),
"breach_date": b.get("BreachDate"),
"added_date": b.get("AddedDate"),
"pwn_count": b.get("PwnCount"),
"data_classes": b.get("DataClasses", []),
"description": b.get("Description"),
"logo": b.get("LogoPath"),
"is_verified": b.get("IsVerified"),
"is_sensitive": b.get("IsSensitive"),
"is_stealer_log": b.get("IsStealerLog"),
"is_malware": b.get("IsMalware"),
}
for b in breaches
]
# =============================================================================
# FLASK APP
# =============================================================================
app = Flask(__name__)
hibp_client = HIBPClient()
@app.route("/", methods=["GET"])
def index():
return jsonify({
"service": "HaveIBeenPwned Proxy API",
"version": "1.0.0",
"endpoints": {
"GET /": "Documentation",
"GET /search/<email>": "Raw HIBP search",
"GET /breaches/<email>": "Parsed breach summaries",
"GET /health": "Health check",
},
})
@app.route("/health", methods=["GET"])
def health():
return jsonify({
"status": "ok",
"timestamp": datetime.utcnow().isoformat() + "Z",
})
@app.route("/search/<path:email>", methods=["GET"])
def search_email(email: str):
if not email or "@" not in email:
return jsonify({"error": "Invalid email address.", "email": email}), 400
result = hibp_client.search_email(email)
if result["status_code"] in (200, 404):
http_status = 200
else:
http_status = result["status_code"] or 500
return jsonify(result), http_status
@app.route("/breaches/<path:email>", methods=["GET"])
def get_breaches(email: str):
if not email or "@" not in email:
return jsonify({"error": "Invalid email address.", "email": email}), 400
result = hibp_client.search_email(email)
if result["data"]:
breaches = hibp_client.parse_breaches(result["data"])
pastes = result["data"].get("Pastes") or []
return jsonify({
"email": email,
"timestamp": result["timestamp"],
"total_breaches": len(breaches),
"total_pastes": len(pastes),
"breaches": breaches,
"pastes": pastes,
"is_pwned": len(breaches) > 0 or len(pastes) > 0,
})
else:
return jsonify({
"email": email,
"timestamp": result["timestamp"],
"error": result["error"],
"is_pwned": None,
}), result["status_code"] or 500
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
import os
port = int(os.environ.get("PORT", 5000))
host = os.environ.get("HOST", "0.0.0.0")
app.run(host=host, port=port, debug=False)