import asyncio import json from datetime import datetime from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel app = FastAPI(title="HF Log Streams Service") # 全局广播队列(支持无限多客户端同时订阅) log_queue: asyncio.Queue = asyncio.Queue() class LogEntry(BaseModel): level: str = "info" message: str source: str = "external" extra: dict | None = None @app.post("/ingest") async def ingest_log(log: LogEntry): timestamp = datetime.now().isoformat() entry = { "timestamp": timestamp, "level": log.level.upper(), "message": log.message, "source": log.source, "extra": log.extra or {} } # 1. 自动出现在 Space 官方 Logs(实时可见) print(f"[{timestamp}] [{entry['level']}] {log.message} | source={log.source}") # 2. 广播给所有 SSE 订阅者 await log_queue.put(entry) return {"status": "ok", "timestamp": timestamp} @app.get("/stream") async def stream_logs(request: Request): async def event_generator(): while True: if await request.is_disconnected(): break try: log_entry = await asyncio.wait_for(log_queue.get(), timeout=30.0) yield f"data: {json.dumps(log_entry)}\n\n" except asyncio.TimeoutError: yield ": ping\n\n" # 保持连接 return StreamingResponse( event_generator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} ) @app.get("/") async def root(): return {"message": "Log Streams 服务已就绪!POST /ingest 写日志,GET /stream 订阅实时流"}