| import logging |
| from typing import Dict, List, Optional, Any |
| from fastapi import WebSocket, WebSocketDisconnect, APIRouter |
| from pydantic import BaseModel |
| import json |
| import time |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| |
| class ConnectionStatus(BaseModel): |
| user_id: str |
| active: bool |
| connection_count: int |
| last_activity: Optional[float] = None |
|
|
| class UserConnection(BaseModel): |
| user_id: str |
| connection_count: int |
|
|
| class AllConnectionsStatus(BaseModel): |
| total_users: int |
| total_connections: int |
| users: List[UserConnection] |
|
|
| |
| router = APIRouter( |
| prefix="/ws", |
| tags=["WebSockets"], |
| ) |
|
|
| class ConnectionManager: |
| """Quản lý các kết nối WebSocket""" |
| |
| def __init__(self): |
| |
| self.active_connections: Dict[str, List[WebSocket]] = {} |
| |
| async def connect(self, websocket: WebSocket, user_id: str): |
| """Kết nối một WebSocket mới""" |
| await websocket.accept() |
| if user_id not in self.active_connections: |
| self.active_connections[user_id] = [] |
| self.active_connections[user_id].append(websocket) |
| logger.info(f"New WebSocket connection for user {user_id}. Total connections: {len(self.active_connections[user_id])}") |
| |
| def disconnect(self, websocket: WebSocket, user_id: str): |
| """Ngắt kết nối WebSocket""" |
| if user_id in self.active_connections: |
| if websocket in self.active_connections[user_id]: |
| self.active_connections[user_id].remove(websocket) |
| |
| if not self.active_connections[user_id]: |
| del self.active_connections[user_id] |
| logger.info(f"WebSocket disconnected for user {user_id}") |
| |
| async def send_message(self, message: Dict[str, Any], user_id: str): |
| """Gửi tin nhắn tới tất cả kết nối của một user""" |
| if user_id in self.active_connections: |
| disconnected_websockets = [] |
| for websocket in self.active_connections[user_id]: |
| try: |
| await websocket.send_text(json.dumps(message)) |
| except Exception as e: |
| logger.error(f"Error sending message to WebSocket: {str(e)}") |
| disconnected_websockets.append(websocket) |
| |
| |
| for websocket in disconnected_websockets: |
| self.disconnect(websocket, user_id) |
| |
| def get_connection_status(self, user_id: str = None) -> Dict[str, Any]: |
| """Lấy thông tin về trạng thái kết nối WebSocket""" |
| if user_id: |
| |
| if user_id in self.active_connections: |
| return { |
| "user_id": user_id, |
| "active": True, |
| "connection_count": len(self.active_connections[user_id]), |
| "last_activity": time.time() |
| } |
| else: |
| return { |
| "user_id": user_id, |
| "active": False, |
| "connection_count": 0, |
| "last_activity": None |
| } |
| else: |
| |
| result = { |
| "total_users": len(self.active_connections), |
| "total_connections": sum(len(connections) for connections in self.active_connections.values()), |
| "users": [] |
| } |
| |
| for uid, connections in self.active_connections.items(): |
| result["users"].append({ |
| "user_id": uid, |
| "connection_count": len(connections) |
| }) |
| |
| return result |
|
|
|
|
| |
| manager = ConnectionManager() |
|
|
| |
| @router.get("/ws/test/{user_id}") |
| async def test_websocket_send(user_id: str): |
| """ |
| Test route to manually send a WebSocket message to a user |
| This is useful for debugging WebSocket connections |
| """ |
| logger.info(f"Attempting to send test message to user: {user_id}") |
| |
| |
| status = manager.get_connection_status(user_id) |
| if not status["active"]: |
| logger.warning(f"No active WebSocket connection for user: {user_id}") |
| return {"success": False, "message": f"No active WebSocket connection for user: {user_id}"} |
| |
| |
| await manager.send_message({ |
| "type": "test_message", |
| "message": "This is a test WebSocket message", |
| "timestamp": int(time.time()) |
| }, user_id) |
| |
| logger.info(f"Test message sent to user: {user_id}") |
| return {"success": True, "message": f"Test message sent to user: {user_id}"} |
|
|
| @router.websocket("/ws/pdf/{user_id}") |
| async def websocket_endpoint(websocket: WebSocket, user_id: str): |
| """Endpoint WebSocket để cập nhật tiến trình xử lý PDF""" |
| logger.info(f"WebSocket connection request received for user: {user_id}") |
| |
| try: |
| await manager.connect(websocket, user_id) |
| logger.info(f"WebSocket connection accepted for user: {user_id}") |
| |
| |
| await manager.send_message({ |
| "type": "connection_established", |
| "message": "WebSocket connection established successfully", |
| "user_id": user_id, |
| "timestamp": int(time.time()) |
| }, user_id) |
| |
| try: |
| while True: |
| |
| data = await websocket.receive_text() |
| logger.debug(f"Received from client: {data}") |
| |
| |
| if data != "heartbeat": |
| await manager.send_message({ |
| "type": "echo", |
| "message": f"Received: {data}", |
| "timestamp": int(time.time()) |
| }, user_id) |
| except WebSocketDisconnect: |
| logger.info(f"WebSocket disconnected for user: {user_id}") |
| manager.disconnect(websocket, user_id) |
| except Exception as e: |
| logger.error(f"WebSocket error: {str(e)}") |
| manager.disconnect(websocket, user_id) |
| except Exception as e: |
| logger.error(f"Failed to establish WebSocket connection: {str(e)}") |
| |
| if websocket.client_state != 4: |
| await websocket.close(code=1011, reason=f"Server error: {str(e)}") |
|
|
| import logging |
| from typing import Dict, List, Optional, Any |
| from fastapi import WebSocket, WebSocketDisconnect, APIRouter |
| from pydantic import BaseModel |
| import json |
| import time |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| |
| class ConnectionStatus(BaseModel): |
| user_id: str |
| active: bool |
| connection_count: int |
| last_activity: Optional[float] = None |
|
|
| class UserConnection(BaseModel): |
| user_id: str |
| connection_count: int |
|
|
| class AllConnectionsStatus(BaseModel): |
| total_users: int |
| total_connections: int |
| users: List[UserConnection] |
|
|
| |
| router = APIRouter( |
| prefix="", |
| tags=["WebSockets"], |
| ) |
|
|
| class ConnectionManager: |
| """Quản lý các kết nối WebSocket""" |
| |
| def __init__(self): |
| |
| self.active_connections: Dict[str, List[WebSocket]] = {} |
| |
| async def connect(self, websocket: WebSocket, user_id: str): |
| """Kết nối một WebSocket mới""" |
| await websocket.accept() |
| if user_id not in self.active_connections: |
| self.active_connections[user_id] = [] |
| self.active_connections[user_id].append(websocket) |
| logger.info(f"New WebSocket connection for user {user_id}. Total connections: {len(self.active_connections[user_id])}") |
| |
| def disconnect(self, websocket: WebSocket, user_id: str): |
| """Ngắt kết nối WebSocket""" |
| if user_id in self.active_connections: |
| if websocket in self.active_connections[user_id]: |
| self.active_connections[user_id].remove(websocket) |
| |
| if not self.active_connections[user_id]: |
| del self.active_connections[user_id] |
| logger.info(f"WebSocket disconnected for user {user_id}") |
| |
| async def send_message(self, message: Dict[str, Any], user_id: str): |
| """Gửi tin nhắn tới tất cả kết nối của một user""" |
| if user_id in self.active_connections: |
| disconnected_websockets = [] |
| for websocket in self.active_connections[user_id]: |
| try: |
| await websocket.send_text(json.dumps(message)) |
| except Exception as e: |
| logger.error(f"Error sending message to WebSocket: {str(e)}") |
| disconnected_websockets.append(websocket) |
| |
| |
| for websocket in disconnected_websockets: |
| self.disconnect(websocket, user_id) |
| |
| def get_connection_status(self, user_id: str = None) -> Dict[str, Any]: |
| """Lấy thông tin về trạng thái kết nối WebSocket""" |
| if user_id: |
| |
| if user_id in self.active_connections: |
| return { |
| "user_id": user_id, |
| "active": True, |
| "connection_count": len(self.active_connections[user_id]), |
| "last_activity": time.time() |
| } |
| else: |
| return { |
| "user_id": user_id, |
| "active": False, |
| "connection_count": 0, |
| "last_activity": None |
| } |
| else: |
| |
| result = { |
| "total_users": len(self.active_connections), |
| "total_connections": sum(len(connections) for connections in self.active_connections.values()), |
| "users": [] |
| } |
| |
| for uid, connections in self.active_connections.items(): |
| result["users"].append({ |
| "user_id": uid, |
| "connection_count": len(connections) |
| }) |
| |
| return result |
|
|
|
|
| |
| manager = ConnectionManager() |
|
|
| @router.websocket("/ws/pdf/{user_id}") |
| async def websocket_endpoint(websocket: WebSocket, user_id: str): |
| """Endpoint WebSocket để cập nhật tiến trình xử lý PDF""" |
| await manager.connect(websocket, user_id) |
| try: |
| while True: |
| |
| await websocket.receive_text() |
| except WebSocketDisconnect: |
| manager.disconnect(websocket, user_id) |
| except Exception as e: |
| logger.error(f"WebSocket error: {str(e)}") |
| manager.disconnect(websocket, user_id) |
|
|
| |
| @router.get("/ws/status", response_model=AllConnectionsStatus, responses={ |
| 200: { |
| "description": "Successful response", |
| "content": { |
| "application/json": { |
| "example": { |
| "total_users": 2, |
| "total_connections": 3, |
| "users": [ |
| {"user_id": "user1", "connection_count": 2}, |
| {"user_id": "user2", "connection_count": 1} |
| ] |
| } |
| } |
| } |
| } |
| }) |
| async def get_all_websocket_connections(): |
| """ |
| Lấy thông tin về tất cả kết nối WebSocket hiện tại. |
| |
| Endpoint này trả về: |
| - Tổng số người dùng đang kết nối |
| - Tổng số kết nối WebSocket |
| - Danh sách người dùng kèm theo số lượng kết nối của mỗi người |
| """ |
| return manager.get_connection_status() |
|
|
| @router.get("/ws/status/{user_id}", response_model=ConnectionStatus, responses={ |
| 200: { |
| "description": "Successful response for active connection", |
| "content": { |
| "application/json": { |
| "examples": { |
| "active_connection": { |
| "summary": "Active connection", |
| "value": { |
| "user_id": "user123", |
| "active": True, |
| "connection_count": 2, |
| "last_activity": 1634567890.123 |
| } |
| }, |
| "no_connection": { |
| "summary": "No active connection", |
| "value": { |
| "user_id": "user456", |
| "active": False, |
| "connection_count": 0, |
| "last_activity": None |
| } |
| } |
| } |
| } |
| } |
| } |
| }) |
| async def get_user_websocket_status(user_id: str): |
| """ |
| Lấy thông tin về kết nối WebSocket của một người dùng cụ thể. |
| |
| Parameters: |
| - **user_id**: ID của người dùng cần kiểm tra |
| |
| Returns: |
| - Thông tin về trạng thái kết nối, bao gồm: |
| - active: Có đang kết nối hay không |
| - connection_count: Số lượng kết nối hiện tại |
| - last_activity: Thời gian hoạt động gần nhất |
| """ |
| return manager.get_connection_status(user_id) |
|
|
| |
|
|
| async def send_pdf_upload_started(user_id: str, filename: str, document_id: str): |
| """Gửi thông báo bắt đầu upload PDF""" |
| await manager.send_message({ |
| "type": "pdf_upload_started", |
| "document_id": document_id, |
| "filename": filename, |
| "timestamp": int(time.time()) |
| }, user_id) |
|
|
| async def send_pdf_upload_progress(user_id: str, document_id: str, step: str, progress: float, message: str): |
| """Gửi thông báo tiến độ upload PDF""" |
| await manager.send_message({ |
| "type": "pdf_upload_progress", |
| "document_id": document_id, |
| "step": step, |
| "progress": progress, |
| "message": message, |
| "timestamp": int(time.time()) |
| }, user_id) |
|
|
| async def send_pdf_upload_completed(user_id: str, document_id: str, filename: str, chunks: int): |
| """Gửi thông báo hoàn thành upload PDF""" |
| await manager.send_message({ |
| "type": "pdf_upload_completed", |
| "document_id": document_id, |
| "filename": filename, |
| "chunks": chunks, |
| "timestamp": int(time.time()) |
| }, user_id) |
|
|
| async def send_pdf_upload_failed(user_id: str, document_id: str, filename: str, error: str): |
| """Gửi thông báo lỗi upload PDF""" |
| await manager.send_message({ |
| "type": "pdf_upload_failed", |
| "document_id": document_id, |
| "filename": filename, |
| "error": error, |
| "timestamp": int(time.time()) |
| }, user_id) |
|
|
| async def send_pdf_delete_started(user_id: str, namespace: str): |
| """Gửi thông báo bắt đầu xóa PDF""" |
| await manager.send_message({ |
| "type": "pdf_delete_started", |
| "namespace": namespace, |
| "timestamp": int(time.time()) |
| }, user_id) |
|
|
| async def send_pdf_delete_completed(user_id: str, namespace: str, deleted_count: int = 0): |
| """Gửi thông báo hoàn thành xóa PDF""" |
| await manager.send_message({ |
| "type": "pdf_delete_completed", |
| "namespace": namespace, |
| "deleted_count": deleted_count, |
| "timestamp": int(time.time()) |
| }, user_id) |
|
|
| async def send_pdf_delete_failed(user_id: str, namespace: str, error: str): |
| """Gửi thông báo lỗi xóa PDF""" |
| await manager.send_message({ |
| "type": "pdf_delete_failed", |
| "namespace": namespace, |
| "error": error, |
| "timestamp": int(time.time()) |
| }, user_id) |