| """ |
| WebSocket API for Integration Services |
| |
| This module provides WebSocket endpoints for integration services |
| including HuggingFace AI models and persistence operations. |
| """ |
|
|
| import asyncio |
| from datetime import datetime |
| from typing import Any, Dict |
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect |
| import logging |
|
|
| from backend.services.ws_service_manager import ws_manager, ServiceType |
| from backend.services.hf_registry import HFRegistry |
| from backend.services.hf_client import HFClient |
| from backend.services.persistence_service import PersistenceService |
| from config import Config |
|
|
| logger = logging.getLogger(__name__) |
|
|
| router = APIRouter() |
|
|
|
|
| |
| |
| |
|
|
| class IntegrationStreamers: |
| """Handles data streaming for integration services""" |
|
|
| def __init__(self): |
| self.config = Config() |
| try: |
| self.hf_registry = HFRegistry() |
| except: |
| self.hf_registry = None |
| logger.warning("HFRegistry not available") |
|
|
| try: |
| self.hf_client = HFClient() |
| except: |
| self.hf_client = None |
| logger.warning("HFClient not available") |
|
|
| try: |
| self.persistence_service = PersistenceService() |
| except: |
| self.persistence_service = None |
| logger.warning("PersistenceService not available") |
|
|
| |
| |
| |
|
|
| async def stream_hf_registry_status(self): |
| """Stream HuggingFace registry status""" |
| if not self.hf_registry: |
| return None |
|
|
| try: |
| status = self.hf_registry.get_status() |
| if status: |
| return { |
| "total_models": status.get("total_models", 0), |
| "total_datasets": status.get("total_datasets", 0), |
| "available_models": status.get("available_models", []), |
| "available_datasets": status.get("available_datasets", []), |
| "last_refresh": status.get("last_refresh"), |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming HF registry status: {e}") |
| return None |
|
|
| async def stream_hf_model_usage(self): |
| """Stream HuggingFace model usage statistics""" |
| if not self.hf_client: |
| return None |
|
|
| try: |
| usage = self.hf_client.get_usage_stats() |
| if usage: |
| return { |
| "total_requests": usage.get("total_requests", 0), |
| "successful_requests": usage.get("successful_requests", 0), |
| "failed_requests": usage.get("failed_requests", 0), |
| "average_latency": usage.get("average_latency"), |
| "model_usage": usage.get("model_usage", {}), |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming HF model usage: {e}") |
| return None |
|
|
| async def stream_sentiment_results(self): |
| """Stream real-time sentiment analysis results""" |
| if not self.hf_client: |
| return None |
|
|
| try: |
| |
| results = self.hf_client.get_recent_results() |
| if results: |
| return { |
| "sentiment_results": results, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming sentiment results: {e}") |
| return None |
|
|
| async def stream_model_events(self): |
| """Stream model loading and unloading events""" |
| if not self.hf_registry: |
| return None |
|
|
| try: |
| events = self.hf_registry.get_recent_events() |
| if events: |
| return { |
| "model_events": events, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming model events: {e}") |
| return None |
|
|
| |
| |
| |
|
|
| async def stream_persistence_status(self): |
| """Stream persistence service status""" |
| if not self.persistence_service: |
| return None |
|
|
| try: |
| status = self.persistence_service.get_status() |
| if status: |
| return { |
| "storage_location": status.get("storage_location"), |
| "total_records": status.get("total_records", 0), |
| "storage_size": status.get("storage_size"), |
| "last_save": status.get("last_save"), |
| "active_writers": status.get("active_writers", 0), |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming persistence status: {e}") |
| return None |
|
|
| async def stream_save_events(self): |
| """Stream data save events""" |
| if not self.persistence_service: |
| return None |
|
|
| try: |
| events = self.persistence_service.get_recent_saves() |
| if events: |
| return { |
| "save_events": events, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming save events: {e}") |
| return None |
|
|
| async def stream_export_progress(self): |
| """Stream export operation progress""" |
| if not self.persistence_service: |
| return None |
|
|
| try: |
| progress = self.persistence_service.get_export_progress() |
| if progress: |
| return { |
| "export_operations": progress, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming export progress: {e}") |
| return None |
|
|
| async def stream_backup_events(self): |
| """Stream backup creation events""" |
| if not self.persistence_service: |
| return None |
|
|
| try: |
| backups = self.persistence_service.get_recent_backups() |
| if backups: |
| return { |
| "backup_events": backups, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming backup events: {e}") |
| return None |
|
|
|
|
| |
| integration_streamers = IntegrationStreamers() |
|
|
|
|
| |
| |
| |
|
|
| async def start_integration_streams(): |
| """Start all integration stream tasks""" |
| logger.info("Starting integration WebSocket streams") |
|
|
| tasks = [ |
| |
| asyncio.create_task(ws_manager.start_service_stream( |
| ServiceType.HUGGINGFACE, |
| integration_streamers.stream_hf_registry_status, |
| interval=60.0 |
| )), |
|
|
| |
| asyncio.create_task(ws_manager.start_service_stream( |
| ServiceType.PERSISTENCE, |
| integration_streamers.stream_persistence_status, |
| interval=30.0 |
| )), |
| ] |
|
|
| await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
| |
| |
| |
|
|
| @router.websocket("/ws/integration") |
| async def websocket_integration_endpoint(websocket: WebSocket): |
| """ |
| Unified WebSocket endpoint for all integration services |
| |
| Connection URL: ws://host:port/ws/integration |
| |
| After connecting, send subscription messages: |
| { |
| "action": "subscribe", |
| "service": "huggingface" | "persistence" | "all" |
| } |
| |
| To unsubscribe: |
| { |
| "action": "unsubscribe", |
| "service": "service_name" |
| } |
| """ |
| connection = await ws_manager.connect(websocket) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
|
|
| except WebSocketDisconnect: |
| logger.info(f"Integration client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"Integration WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|
|
|
| @router.websocket("/ws/huggingface") |
| async def websocket_huggingface(websocket: WebSocket): |
| """ |
| Dedicated WebSocket endpoint for HuggingFace services |
| |
| Auto-subscribes to huggingface service |
| """ |
| connection = await ws_manager.connect(websocket) |
| connection.subscribe(ServiceType.HUGGINGFACE) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
| except WebSocketDisconnect: |
| logger.info(f"HuggingFace client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"HuggingFace WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|
|
|
| @router.websocket("/ws/persistence") |
| async def websocket_persistence(websocket: WebSocket): |
| """ |
| Dedicated WebSocket endpoint for persistence service |
| |
| Auto-subscribes to persistence service |
| """ |
| connection = await ws_manager.connect(websocket) |
| connection.subscribe(ServiceType.PERSISTENCE) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
| except WebSocketDisconnect: |
| logger.info(f"Persistence client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"Persistence WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|
|
|
| @router.websocket("/ws/ai") |
| async def websocket_ai(websocket: WebSocket): |
| """ |
| Dedicated WebSocket endpoint for AI/ML operations (alias for HuggingFace) |
| |
| Auto-subscribes to huggingface service |
| """ |
| connection = await ws_manager.connect(websocket) |
| connection.subscribe(ServiceType.HUGGINGFACE) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
| except WebSocketDisconnect: |
| logger.info(f"AI client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"AI WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|