| import os |
| from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, status, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import HTMLResponse, JSONResponse, FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.templating import Jinja2Templates |
| import json |
| import uvicorn |
| from typing import Dict, Set, Optional, Any |
| import logging |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI(title="EFT Group Map API") |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| app.mount("/static", StaticFiles(directory="static"), name="static") |
| templates = Jinja2Templates(directory="templates") |
|
|
| |
| maps_data = {} |
| try: |
| with open("data.json", "r", encoding="utf8") as file: |
| maps_data = json.load(file) |
| except FileNotFoundError: |
| logger.error("data.json nicht gefunden") |
| maps_data = {} |
|
|
|
|
| @app.get("/", response_class=FileResponse) |
| async def read_index(request: Request): |
| return templates.TemplateResponse("map.html", {"request": request, "maps": maps_data}) |
|
|
|
|
| @app.get("/map/{normalized_name}", response_class=FileResponse) |
| async def get_map_by_normalized_name(request: Request, normalized_name: str): |
| map_entry = maps_data.get(normalized_name) |
| filtered_maps = [map_entry] if map_entry else [] |
| return templates.TemplateResponse("map.html", {"request": request, "maps": filtered_maps}) |
|
|
|
|
| |
| @app.get("/api/map/{map_name}") |
| async def get_map_data(map_name: str): |
| """API-Endpunkt zum Abrufen von Kartendaten""" |
| map_entry = maps_data.get(map_name) |
| if not map_entry: |
| raise HTTPException(status_code=404, detail="Map not found") |
| return JSONResponse(content=map_entry) |
|
|
|
|
| class ConnectionManager: |
| def __init__(self): |
| self.groups: Dict[str, Set[WebSocket]] = {} |
| self.connections: Dict[WebSocket, str] = {} |
| self.client_types: Dict[WebSocket, str] = {} |
| self.message_stats = {"total_sent": 0, "total_received": 0} |
| self.group_leaders: Dict[str, WebSocket] = {} |
|
|
| async def connect(self, websocket: WebSocket): |
| await websocket.accept() |
| logger.info(f"Neue WebSocket-Verbindung: {websocket.client}") |
|
|
| def disconnect(self, websocket: WebSocket): |
| if websocket in self.connections: |
| group_name = self.connections[websocket] |
| self.remove_from_group(websocket, group_name) |
| logger.info(f"Client disconnected from group '{group_name}'") |
|
|
| async def join_group(self, websocket: WebSocket, group_name: str, client_type: str = "web"): |
| """ |
| Client einer Gruppe hinzufügen |
| |
| Args: |
| websocket: WebSocket-Verbindung |
| group_name: Name der Gruppe |
| client_type: Typ des Clients ("monitor" oder "web") |
| """ |
| |
| if websocket in self.connections: |
| old_group = self.connections[websocket] |
| self.remove_from_group(websocket, old_group) |
|
|
| |
| if group_name not in self.groups: |
| self.groups[group_name] = set() |
| self.group_leaders[group_name] = websocket |
| logger.info(f"Neue Gruppe '{group_name}' erstellt von {client_type}-Client") |
|
|
| self.groups[group_name].add(websocket) |
| self.connections[websocket] = group_name |
| self.client_types[websocket] = client_type |
|
|
| |
| is_first_client = self.group_leaders.get(group_name) == websocket |
| client_info = { |
| "type": "joined", |
| "group": group_name, |
| "client_type": client_type, |
| "is_first": is_first_client, |
| "message": f"Erfolgreich Gruppe '{group_name}' beigetreten als {client_type}-Client", |
| } |
|
|
| logger.info(f"Client ({client_type}) joined group '{group_name}' (First: {is_first_client})") |
| return client_info |
|
|
| def remove_from_group(self, websocket: WebSocket, group_name: str): |
| if group_name in self.groups and websocket in self.groups[group_name]: |
| self.groups[group_name].remove(websocket) |
| |
| if self.group_leaders.get(group_name) == websocket and self.groups[group_name]: |
| |
| self.group_leaders[group_name] = next(iter(self.groups[group_name])) |
| logger.info(f"Neuer Gruppenleiter für '{group_name}' gewählt") |
| elif not self.groups[group_name]: |
| del self.groups[group_name] |
| if group_name in self.group_leaders: |
| del self.group_leaders[group_name] |
| logger.info(f"Group '{group_name}' deleted (no members)") |
|
|
| if websocket in self.connections: |
| del self.connections[websocket] |
| if websocket in self.client_types: |
| del self.client_types[websocket] |
|
|
| async def broadcast_to_group(self, message: str, group_name: str, exclude: Optional[WebSocket] = None): |
| if group_name in self.groups: |
| disconnected = set() |
| for connection in self.groups[group_name].copy(): |
| |
| if connection == exclude: |
| continue |
|
|
| try: |
| await connection.send_text(message) |
| self.message_stats["total_sent"] += 1 |
| except Exception as e: |
| logger.error(f"Fehler beim Senden an Client: {e}") |
| disconnected.add(connection) |
|
|
| |
| for conn in disconnected: |
| self.disconnect(conn) |
|
|
| def get_stats(self): |
| return {"active_groups": len(self.groups), "total_connections": len(self.connections), **self.message_stats} |
|
|
| def get_group_info(self, group_name: str) -> Dict[str, Any]: |
| """Gibt Informationen über eine Gruppe zurück""" |
| if group_name not in self.groups: |
| return {"group": group_name, "members": 0, "clients": []} |
|
|
| members = [] |
| for ws in self.groups[group_name]: |
| client_type = self.client_types.get(ws, "unknown") |
| is_leader = self.group_leaders.get(group_name) == ws |
| members.append({"client_type": client_type, "is_leader": is_leader}) |
|
|
| return {"group": group_name, "members": len(members), "clients": members} |
|
|
|
|
| manager = ConnectionManager() |
|
|
|
|
| @app.websocket("/ws") |
| async def websocket_endpoint(websocket: WebSocket): |
| await manager.connect(websocket) |
| try: |
| while True: |
| data = await websocket.receive_text() |
| manager.message_stats["total_received"] += 1 |
|
|
| try: |
| message = json.loads(data) |
|
|
| |
| if message.get("type") == "join": |
| group_name = message.get("group") |
| client_type = message.get("client_type", "web") |
| if group_name: |
| join_info = await manager.join_group(websocket, group_name, client_type) |
| |
| await websocket.send_text(json.dumps(join_info)) |
|
|
| |
| notification = { |
| "type": "client_joined", |
| "group": group_name, |
| "client_type": client_type, |
| "message": f"Neuer {client_type}-Client ist der Gruppe beigetreten", |
| } |
| await manager.broadcast_to_group(json.dumps(notification), group_name, exclude=websocket) |
| else: |
| error_msg = {"type": "error", "message": "Missing group name in join request"} |
| await websocket.send_text(json.dumps(error_msg)) |
|
|
| |
| elif message.get("type") == "ping": |
| await websocket.send_text(json.dumps({"type": "pong"})) |
|
|
| |
| else: |
| if websocket in manager.connections: |
| group_name = manager.connections[websocket] |
| await manager.broadcast_to_group(data, group_name) |
| else: |
| error_msg = {"type": "error", "message": "Join a group before sending messages"} |
| await websocket.send_text(json.dumps(error_msg)) |
|
|
| except json.JSONDecodeError: |
| logger.warning(f"Invalid JSON received: {data}") |
| if websocket in manager.connections: |
| group_name = manager.connections[websocket] |
| await manager.broadcast_to_group(data, group_name) |
|
|
| except WebSocketDisconnect: |
| logger.info("WebSocket disconnected") |
| manager.disconnect(websocket) |
| except Exception as e: |
| logger.error(f"Unerwarteter Fehler: {e}") |
| manager.disconnect(websocket) |
|
|
|
|
| |
| @app.get("/health") |
| async def health_check(): |
| return {"status": "healthy", "websocket_stats": manager.get_stats()} |
|
|
|
|
| |
| @app.get("/api/group/{group_name}") |
| async def get_group_info(group_name: str): |
| """API-Endpunkt zum Abrufen von Gruppeninformationen""" |
| group_info = manager.get_group_info(group_name) |
| if group_info["members"] == 0: |
| raise HTTPException(status_code=404, detail="Group not found") |
| return JSONResponse(content=group_info) |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info") |
|
|