| from fastapi import FastAPI, Request, Response |
| from fastapi.responses import HTMLResponse |
| import httpx |
| from bs4 import BeautifulSoup |
| from urllib.parse import urljoin, quote |
|
|
| app = FastAPI() |
|
|
| HTML_INDEX = """ |
| <!doctype html> |
| <html> |
| <head> |
| <meta charset="utf-8" /> |
| <title>HF Proxy Browser</title> |
| <style> |
| body { font-family: sans-serif; margin: 0; padding: 0; } |
| #bar { |
| padding: 10px; |
| background: #111827; |
| color: #e5e7eb; |
| display: flex; |
| gap: 8px; |
| align-items: center; |
| } |
| input[type="text"] { |
| flex: 1; |
| padding: 6px 8px; |
| border-radius: 4px; |
| border: 1px solid #4b5563; |
| background: #111827; |
| color: #e5e7eb; |
| } |
| button { |
| padding: 6px 12px; |
| border-radius: 4px; |
| border: none; |
| cursor: pointer; |
| } |
| #go { |
| background: #3b82f6; |
| color: white; |
| } |
| #frame { |
| width: 100%; |
| height: calc(100vh - 48px); |
| border: none; |
| } |
| </style> |
| </head> |
| <body> |
| <div id="bar"> |
| <span>Proxy URL:</span> |
| <input id="url" type="text" placeholder="https://example.com" /> |
| <button id="go">Go</button> |
| </div> |
| <iframe id="frame"></iframe> |
| <script> |
| const input = document.getElementById('url'); |
| const frame = document.getElementById('frame'); |
| const btn = document.getElementById('go'); |
| |
| function load() { |
| let url = input.value.trim(); |
| if (!url) return; |
| if (!url.startsWith('http://') && !url.startsWith('https://')) { |
| url = 'https://' + url; |
| } |
| frame.src = '/proxy?url=' + encodeURIComponent(url); |
| } |
| |
| btn.addEventListener('click', load); |
| input.addEventListener('keydown', e => { |
| if (e.key === 'Enter') { |
| e.preventDefault(); |
| load(); |
| } |
| }); |
| </script> |
| </body> |
| </html> |
| """ |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def index(): |
| return HTML_INDEX |
|
|
|
|
| async def fetch_url(url: str, request: Request) -> httpx.Response: |
| """ |
| Fetch target URL via httpx, forwarding some useful headers |
| (like Range for video/audio). |
| """ |
| client_headers = request.headers |
|
|
| headers = { |
| "User-Agent": client_headers.get( |
| "user-agent", |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/120.0 Safari/537.36", |
| ), |
| "Accept": client_headers.get("accept", "*/*"), |
| "Accept-Language": client_headers.get("accept-language", "en-US,en;q=0.9"), |
| } |
|
|
| |
| range_header = client_headers.get("range") |
| if range_header: |
| headers["Range"] = range_header |
|
|
| async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client: |
| resp = await client.get(url, headers=headers) |
| return resp |
|
|
|
|
| def rewrite_html(html: str, base_url: str) -> str: |
| """ |
| Rewrite links in HTML so sub-resources (scripts, css, images, video, etc.) |
| go through /proxy as well. |
| """ |
| soup = BeautifulSoup(html, "html.parser") |
|
|
| def proxify(attr: str, tag): |
| if attr not in tag.attrs: |
| return |
| original = tag.attrs.get(attr) |
| if not original: |
| return |
| absolute = urljoin(base_url, original) |
| tag.attrs[attr] = f"/proxy?url={quote(absolute, safe='')}" |
|
|
| |
| for tag in soup.find_all( |
| [ |
| "a", |
| "img", |
| "script", |
| "link", |
| "form", |
| "iframe", |
| "video", |
| "audio", |
| "source", |
| ] |
| ): |
| if tag.name in ("a", "link"): |
| proxify("href", tag) |
| if tag.name in ("img", "script", "iframe", "video", "audio", "source"): |
| proxify("src", tag) |
| if tag.name == "form": |
| proxify("action", tag) |
| |
| if tag.name == "video": |
| proxify("poster", tag) |
|
|
| |
| banner = soup.new_tag("div") |
| banner.string = f"Proxied via HF Space — {base_url}" |
| banner["style"] = ( |
| "position:fixed;bottom:0;left:0;right:0;" |
| "background:#111827;color:#e5e7eb;" |
| "font-size:12px;padding:4px 8px;z-index:9999;" |
| ) |
| if soup.body: |
| soup.body.append(banner) |
|
|
| return str(soup) |
|
|
|
|
| @app.get("/proxy") |
| async def proxy(url: str, request: Request): |
| """ |
| Reverse-proxy endpoint: /proxy?url=https://example.com |
| Supports: |
| - HTML (rewritten) |
| - Images |
| - JS / CSS |
| - Video / audio (with Range header forwarded) |
| """ |
| try: |
| upstream = await fetch_url(url, request) |
| except Exception as e: |
| return HTMLResponse( |
| f"<h1>Error</h1><p>Could not fetch {url}</p><pre>{e}</pre>", |
| status_code=502, |
| ) |
|
|
| content_type = upstream.headers.get("content-type", "") |
|
|
| |
| if "text/html" in content_type: |
| rewritten = rewrite_html(upstream.text, base_url=url) |
| return HTMLResponse(content=rewritten, status_code=upstream.status_code) |
|
|
| |
| safe_headers = {} |
| for k, v in upstream.headers.items(): |
| lk = k.lower() |
| |
| if lk in ("content-encoding", "transfer-encoding", "connection"): |
| continue |
| safe_headers[k] = v |
|
|
| return Response( |
| content=upstream.content, |
| status_code=upstream.status_code, |
| headers=safe_headers, |
| media_type=content_type or None, |
| ) |
|
|