log-stream / app.py
candyhead's picture
Create app.py
ed6f503 verified
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI(title="HF Log Streams Service")
# 全局广播队列(支持无限多客户端同时订阅)
log_queue: asyncio.Queue = asyncio.Queue()
class LogEntry(BaseModel):
level: str = "info"
message: str
source: str = "external"
extra: dict | None = None
@app.post("/ingest")
async def ingest_log(log: LogEntry):
timestamp = datetime.now().isoformat()
entry = {
"timestamp": timestamp,
"level": log.level.upper(),
"message": log.message,
"source": log.source,
"extra": log.extra or {}
}
# 1. 自动出现在 Space 官方 Logs(实时可见)
print(f"[{timestamp}] [{entry['level']}] {log.message} | source={log.source}")
# 2. 广播给所有 SSE 订阅者
await log_queue.put(entry)
return {"status": "ok", "timestamp": timestamp}
@app.get("/stream")
async def stream_logs(request: Request):
async def event_generator():
while True:
if await request.is_disconnected():
break
try:
log_entry = await asyncio.wait_for(log_queue.get(), timeout=30.0)
yield f"data: {json.dumps(log_entry)}\n\n"
except asyncio.TimeoutError:
yield ": ping\n\n" # 保持连接
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
@app.get("/")
async def root():
return {"message": "Log Streams 服务已就绪!POST /ingest 写日志,GET /stream 订阅实时流"}