Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β K1RL QUASAR β CENTRAL WEBSOCKET HUB v2.2-ranker-logs β | |
| β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β | |
| β β | |
| β Architecture role: INGEST β NORMALIZE β BROADCAST β | |
| β β | |
| β β’ Accepts publisher connections from Asset Spaces (/ws/publish/{space_name}) β | |
| β β’ Accepts subscriber connections from Ranker Space (/ws/subscribe) β | |
| β β’ ONE-WAY: Publisher β Hub β Subscriber β | |
| β β’ Hub NEVER writes back to publishers β | |
| β β’ Hub stores latest snapshot per asset (NO history) β | |
| β β | |
| β RANKER LOGS API (FIX v2.2 β moved here from hub_dashboard_service port 8052): β | |
| β GET /api/ranker/logs/recent β recent log entries (?limit=N&category=X) β | |
| β GET /api/ranker/logs/stats β log statistics β | |
| β GET /api/ranker/logs/asset/X β logs for asset X β | |
| β GET /api/ranker/logs/level/X β logs by level β | |
| β GET /api/ranker/logs/export β download JSON β | |
| β GET /api/ranker/logs/debug β file discovery diagnostics β | |
| β β | |
| β TRADE API (served natively β no patch script needed): β | |
| β GET /api/trades β full open + closed state + stats β | |
| β GET /api/trades/open β open trades only β | |
| β GET /api/trades/closed β recent closed trades + stats (?limit=N) β | |
| β GET /api/health β service health including trade counts β | |
| β β | |
| β VERSION: v2.2-ranker-logs | 2026-04-04 β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| import asyncio | |
| import copy | |
| import glob | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import threading | |
| import time | |
| from collections import deque | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Set | |
| import uvicorn | |
| from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| # βββ Logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| logger = logging.getLogger("QuasarHub") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 1 β STRICT DATA MODEL | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _ALLOWED_TRAINING_FIELDS: frozenset = frozenset({ | |
| "training_steps", | |
| "actor_loss", | |
| "critic_loss", | |
| "avn_loss", | |
| "avn_accuracy", | |
| }) | |
| _ALLOWED_VOTING_FIELDS: frozenset = frozenset({ | |
| "dominant_signal", | |
| "buy_count", | |
| "sell_count", | |
| "last_price", | |
| "signal_source", | |
| }) | |
| def _empty_snapshot(space_name: str) -> dict: | |
| return { | |
| "space_name": space_name, | |
| "last_updated": 0.0, | |
| "training": { | |
| "training_steps": 0, | |
| "actor_loss": 0.0, | |
| "critic_loss": 0.0, | |
| "avn_loss": 0.0, | |
| "avn_accuracy": 0.0, | |
| }, | |
| "voting": { | |
| "dominant_signal": "NEUTRAL", | |
| "buy_count": 0, | |
| "sell_count": 0, | |
| "last_price": 0.0, | |
| "signal_source": "LOG", | |
| }, | |
| } | |
| def _validate_and_normalize(space_name: str, raw: dict) -> Optional[dict]: | |
| training_raw = raw.get("training", {}) | |
| voting_raw = raw.get("voting", {}) | |
| if not isinstance(training_raw, dict): | |
| training_raw = {} | |
| if not isinstance(voting_raw, dict): | |
| voting_raw = {} | |
| if not training_raw and not voting_raw: | |
| return None | |
| def _float(v, default: float = 0.0) -> float: | |
| try: return float(v) | |
| except: return default | |
| def _int(v, default: int = 0) -> int: | |
| try: return int(v) | |
| except: return default | |
| training: dict = {} | |
| if training_raw: | |
| training = { | |
| "training_steps": _int(training_raw.get("training_steps", 0)), | |
| "actor_loss": _float(training_raw.get("actor_loss", 0.0)), | |
| "critic_loss": _float(training_raw.get("critic_loss", 0.0)), | |
| "avn_loss": _float(training_raw.get("avn_loss", 0.0)), | |
| "avn_accuracy": max(0.0, min(1.0, _float(training_raw.get("avn_accuracy", 0.0)))), | |
| } | |
| voting: dict = {} | |
| if voting_raw: | |
| raw_signal = voting_raw.get("dominant_signal", "NEUTRAL") | |
| if not isinstance(raw_signal, str): | |
| raw_signal = "NEUTRAL" | |
| raw_source = voting_raw.get("signal_source", "LOG") | |
| if not isinstance(raw_source, str): | |
| raw_source = "LOG" | |
| voting = { | |
| "dominant_signal": raw_signal.upper() if raw_signal.upper() in {"BUY", "SELL", "NEUTRAL"} else "NEUTRAL", | |
| "buy_count": _int(voting_raw.get("buy_count", 0)), | |
| "sell_count": _int(voting_raw.get("sell_count", 0)), | |
| "last_price": _float(voting_raw.get("last_price", 0.0)), | |
| "signal_source": raw_source, | |
| } | |
| return { | |
| "space_name": space_name, | |
| "training": training, | |
| "voting": voting, | |
| } | |
| _METRIC_HISTORY_LEN: int = int(os.environ.get("QUASAR_METRIC_HISTORY", "200")) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 2 β CONNECTION MANAGER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ConnectionManager: | |
| # All training field names the hub will accept (including common ranker aliases) | |
| _TRAINING_KEYS: frozenset = frozenset({ | |
| "actor_loss", "critic_loss", "avn_loss", "avn_accuracy", "training_steps", | |
| "a_loss", "c_loss", "loss_actor", "loss_critic", "loss_avn", | |
| "acc", "accuracy", "step", "steps", "n_steps", | |
| }) | |
| _TRAINING_ALIAS: dict = { | |
| "a_loss": "actor_loss", "loss_actor": "actor_loss", | |
| "c_loss": "critic_loss", "loss_critic": "critic_loss", | |
| "loss_avn": "avn_loss", | |
| "acc": "avn_accuracy","accuracy": "avn_accuracy", | |
| "step": "training_steps","steps": "training_steps","n_steps": "training_steps", | |
| } | |
| _VOTING_KEYS: frozenset = frozenset({ | |
| "dominant_signal", "buy_count", "sell_count", "last_price", "signal_source", | |
| "signal", "buy", "sell", | |
| }) | |
| def __init__(self): | |
| self._publishers: Dict[str, WebSocket] = {} | |
| self._subscribers: Set[WebSocket] = set() | |
| self._snapshots: Dict[str, dict] = {} | |
| self._history: Dict[str, deque] = {} # rolling per-space history | |
| self._lock = asyncio.Lock() | |
| self._total_ingested: int = 0 | |
| self._msg_counts: Dict[str, Dict[str, int]] = {} # {space: {msg_type: count}} | |
| async def register_publisher(self, space_name: str, ws: WebSocket) -> None: | |
| await ws.accept() | |
| async with self._lock: | |
| self._publishers[space_name] = ws | |
| if space_name not in self._snapshots: | |
| self._snapshots[space_name] = _empty_snapshot(space_name) | |
| logger.info(f"π‘ Publisher connected: {space_name} (total={len(self._publishers)})") | |
| async def unregister_publisher(self, space_name: str) -> None: | |
| async with self._lock: | |
| self._publishers.pop(space_name, None) | |
| logger.info(f"π‘ Publisher disconnected: {space_name}") | |
| async def register_subscriber(self, ws: WebSocket) -> None: | |
| await ws.accept() | |
| async with self._lock: | |
| self._subscribers.add(ws) | |
| logger.info(f"π Subscriber connected (total={len(self._subscribers)})") | |
| async def unregister_subscriber(self, ws: WebSocket) -> None: | |
| async with self._lock: | |
| self._subscribers.discard(ws) | |
| logger.info(f"π Subscriber disconnected (total={len(self._subscribers)})") | |
| async def ingest(self, space_name: str, raw_payload: dict) -> None: | |
| normalized = _validate_and_normalize(space_name, raw_payload) | |
| if normalized is None: | |
| logger.debug(f"[{space_name}] Payload dropped (no valid fields)") | |
| return | |
| async with self._lock: | |
| snap = self._snapshots.setdefault(space_name, _empty_snapshot(space_name)) | |
| snap["last_updated"] = time.time() | |
| if normalized["training"]: | |
| snap["training"].update(normalized["training"]) | |
| if normalized["voting"]: | |
| snap["voting"].update(normalized["voting"]) | |
| self._total_ingested += 1 | |
| snap_copy = copy.deepcopy(snap) | |
| # ββ Rolling metric history (for sparkline charts in dashboard) ββββββββ | |
| # Only record a point when training fields arrive AND at least one | |
| # loss/accuracy field is non-zero (avoids flooding history with empty | |
| # default-value points before training metrics connect). | |
| training = snap["training"] | |
| if normalized["training"] and any( | |
| training.get(k, 0) != 0 | |
| for k in ("actor_loss", "critic_loss", "avn_loss", "avn_accuracy") | |
| ): | |
| if space_name not in self._history: | |
| self._history[space_name] = deque(maxlen=_METRIC_HISTORY_LEN) | |
| self._history[space_name].append({ | |
| "ts": snap["last_updated"], | |
| "actor_loss": training.get("actor_loss", 0.0), | |
| "critic_loss": training.get("critic_loss", 0.0), | |
| "avn_loss": training.get("avn_loss", 0.0), | |
| "avn_accuracy": training.get("avn_accuracy", 0.0), | |
| "training_steps": training.get("training_steps", 0), | |
| }) | |
| await self._broadcast_update(space_name, snap_copy) | |
| async def _broadcast_update(self, space_name: str, snapshot: dict) -> None: | |
| if not self._subscribers: | |
| return | |
| message = json.dumps({ | |
| "type": "metrics_update", | |
| "space_name": space_name, | |
| "snapshot": snapshot, | |
| "hub_timestamp": time.time(), | |
| }) | |
| dead: list = [] | |
| for ws in list(self._subscribers): | |
| try: | |
| await ws.send_text(message) | |
| except Exception: | |
| dead.append(ws) | |
| if dead: | |
| async with self._lock: | |
| for ws in dead: | |
| self._subscribers.discard(ws) | |
| async def send_initial_state(self, ws: WebSocket) -> None: | |
| async with self._lock: | |
| snapshots_copy = dict(self._snapshots) | |
| message = json.dumps({ | |
| "type": "initial_state", | |
| "snapshots": snapshots_copy, | |
| "hub_timestamp": time.time(), | |
| }) | |
| await ws.send_text(message) | |
| def get_snapshot(self, space_name: str) -> Optional[dict]: | |
| return self._snapshots.get(space_name) | |
| def get_all_snapshots(self) -> dict: | |
| return dict(self._snapshots) | |
| def record_msg(self, space_name: str, msg_type: str) -> None: | |
| """Increment per-space message type counter (non-blocking, called from publisher loop).""" | |
| counts = self._msg_counts.setdefault(space_name, {}) | |
| counts[msg_type] = counts.get(msg_type, 0) + 1 | |
| def get_msg_counts(self) -> dict: | |
| return {s: dict(c) for s, c in self._msg_counts.items()} | |
| def get_metric_history(self) -> dict: | |
| """Return a plain dict of {space_name: [point, β¦]} for all spaces with history.""" | |
| return {name: list(dq) for name, dq in self._history.items()} | |
| def get_health(self) -> dict: | |
| now = time.time() | |
| return { | |
| "publishers": { | |
| name: { | |
| "last_updated": self._snapshots.get(name, {}).get("last_updated", 0), | |
| "stale_seconds": round(now - self._snapshots.get(name, {}).get("last_updated", now), 1), | |
| } | |
| for name in self._publishers | |
| }, | |
| "subscriber_count": len(self._subscribers), | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 3 β HUB TRADE STORE (in-memory, fed by WebSocket messages) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # ROOT CAUSE FIX: The previous TradeLogParser read from *.log files on THIS container | |
| # (/app/ranker_logs). Those files NEVER exist on the Executo Hub space β they are | |
| # written by ranker processes running in the individual executor spaces (V75, V50, β¦), | |
| # each in their own separate container with their own filesystem. | |
| # | |
| # Fix: replace file-based parsing with an in-memory store that is populated when | |
| # executor spaces send WebSocket trade events to this hub. | |
| # | |
| # Executor spaces must send: | |
| # {"type": "trade_opened", "data": {trade_id, asset, direction, entry, qty, opened_at}} | |
| # {"type": "trade_closed", "data": {trade_id, asset, pnl, exit_price, closed_at}} | |
| # | |
| # See ranker_logging.py β the on_event callback already fires for every log entry. | |
| # Wire it in your executor space's ranker like this: | |
| # | |
| # def _trade_ws_hook(entry: dict): | |
| # cat = entry.get("category", "") | |
| # msg = entry.get("message", "") | |
| # if cat != "TRADE": | |
| # return | |
| # meta = entry.get("metadata") or {} | |
| # if "TRADE OPENED" in msg: | |
| # asyncio.create_task(ws.send_text(json.dumps({ | |
| # "type": "trade_opened", | |
| # "data": { | |
| # "trade_id": meta.get("trade_id"), | |
| # "asset": entry.get("asset"), | |
| # "direction": meta.get("direction", "?"), | |
| # "entry": meta.get("price"), | |
| # "qty": meta.get("qty", 0.0), | |
| # "opened_at": entry.get("timestamp", ""), | |
| # }, | |
| # }))) | |
| # elif "TRADE CLOSED" in msg: | |
| # asyncio.create_task(ws.send_text(json.dumps({ | |
| # "type": "trade_closed", | |
| # "data": { | |
| # "trade_id": meta.get("trade_id"), | |
| # "asset": entry.get("asset"), | |
| # "pnl": meta.get("pnl", 0.0), | |
| # "exit_price": meta.get("exit_price"), | |
| # "closed_at": entry.get("timestamp", ""), | |
| # }, | |
| # }))) | |
| # | |
| # ranker_logger = RankerLogger(..., on_event=_trade_ws_hook) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class HubTradeStore: | |
| """ | |
| In-memory trade store populated by WebSocket trade-event messages from | |
| executor spaces. Thread-safe. Replaces the broken file-based TradeLogParser. | |
| """ | |
| def __init__(self) -> None: | |
| self._open: Dict[str, dict] = {} # trade_id β record | |
| self._closed: List[dict] = [] # newest-first, capped at 500 | |
| self._stats = { | |
| "total_opened": 0, | |
| "total_closed": 0, | |
| "total_pnl": 0.0, | |
| "win_count": 0, | |
| "loss_count": 0, | |
| } | |
| self._lock = threading.Lock() | |
| def open_trade(self, space_name: str, data: dict) -> None: | |
| trade_id = data.get("trade_id") or f"{space_name}_{int(time.time())}" | |
| direction = str(data.get("direction", "?")).upper() | |
| entry_px = data.get("entry") or data.get("price") or 0.0 | |
| with self._lock: | |
| self._open[trade_id] = { | |
| "trade_id": trade_id, | |
| "asset": data.get("asset", space_name), | |
| "direction": direction, | |
| "entry": float(entry_px), | |
| "qty": float(data.get("qty", 0.0)), | |
| "opened_at": data.get("opened_at", datetime.utcnow().isoformat()[:19]), | |
| "status": "OPEN", | |
| } | |
| self._stats["total_opened"] += 1 | |
| logger.info( | |
| f"[HubTradeStore] OPEN {trade_id} | {direction} @ {entry_px} " | |
| f"(from {space_name})" | |
| ) | |
| def close_trade(self, space_name: str, data: dict) -> None: | |
| trade_id = data.get("trade_id") | |
| pnl = float(data.get("pnl", 0.0)) | |
| exit_price = data.get("exit_price") | |
| with self._lock: | |
| trade = self._open.pop(trade_id, {}) if trade_id else {} | |
| closed_rec = { | |
| "trade_id": trade_id or "UNKNOWN", | |
| "asset": data.get("asset") or trade.get("asset", space_name), | |
| "direction": str(data.get("direction") or trade.get("direction", "?")).upper(), | |
| "entry": data.get("entry") or trade.get("entry", 0.0), | |
| "exit_price": float(exit_price) if exit_price is not None else None, | |
| "qty": data.get("qty") or trade.get("qty", 0.0), | |
| "pnl": pnl, | |
| "closed_at": data.get("closed_at", datetime.utcnow().isoformat()[:19]), | |
| "status": "CLOSED", | |
| } | |
| self._closed.insert(0, closed_rec) # newest-first | |
| if len(self._closed) > 500: | |
| self._closed = self._closed[:500] | |
| self._stats["total_closed"] += 1 | |
| self._stats["total_pnl"] += pnl | |
| if pnl >= 0: | |
| self._stats["win_count"] += 1 | |
| else: | |
| self._stats["loss_count"] += 1 | |
| logger.info( | |
| f"[HubTradeStore] CLOSE {trade_id} | pnl={pnl:+.4f} " | |
| f"(from {space_name})" | |
| ) | |
| def get_state(self) -> dict: | |
| with self._lock: | |
| open_list = list(self._open.values()) | |
| closed_list = list(self._closed[:100]) # newest 100 for dashboard | |
| stats = dict(self._stats) | |
| total_closed = stats["total_closed"] | |
| stats["win_rate"] = ( | |
| round(stats["win_count"] / total_closed * 100, 1) | |
| if total_closed > 0 else 0.0 | |
| ) | |
| return { | |
| "open": open_list, | |
| "closed": closed_list, | |
| "stats": stats, | |
| } | |
| # ββ Bootstrap βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _LOG_DIR = os.environ.get("RANKER_LOG_DIR", "/app/ranker_logs") | |
| _hub_trades = HubTradeStore() | |
| logger.info("β HubTradeStore initialised β awaiting trade_opened/trade_closed WS messages") | |
| # ββ AXRVI live rankings store βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Populated by POST /api/axrvi/rankings from the Executo ranker after every | |
| # rank_and_gate() cycle (~every 5s). Falls back to hub-snapshot scoring when stale. | |
| _axrvi_rankings: List[dict] = [] | |
| _axrvi_rankings_ts: float = 0.0 | |
| _AXRVI_RANKINGS_TTL: float = 30.0 # seconds before falling back to snapshot scoring | |
| # ββ Top-3 WebSocket client registry βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # top3_client.py connects here and receives top3_rankings broadcasts whenever the | |
| # Executo ranker POSTs new rankings via POST /api/axrvi/rankings. | |
| _top3_clients: Set[WebSocket] = set() | |
| _top3_clients_lock = asyncio.Lock() | |
| async def _broadcast_top3_rankings(rankings: List[dict]) -> None: | |
| """ | |
| Broadcast a top3_rankings message to all /ws/top3 subscribers. | |
| Called immediately after /api/axrvi/rankings receives a fresh ranking list. | |
| Dead connections are pruned automatically. | |
| """ | |
| if not _top3_clients: | |
| return | |
| msg = json.dumps({ | |
| "type": "top3_rankings", | |
| "rankings": rankings, | |
| "total_assets": len(rankings), | |
| "hub_timestamp": time.time(), | |
| }) | |
| dead: list = [] | |
| async with _top3_clients_lock: | |
| clients = list(_top3_clients) | |
| for ws in clients: | |
| try: | |
| await ws.send_text(msg) | |
| except Exception: | |
| dead.append(ws) | |
| if dead: | |
| async with _top3_clients_lock: | |
| for ws in dead: | |
| _top3_clients.discard(ws) | |
| logger.debug(f"[top3] Pruned {len(dead)} dead client(s)") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 4 β FASTAPI APPLICATION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="K1RL QUASAR Hub", | |
| description="Central WebSocket hub β ingest, normalize, broadcast (one-way)", | |
| version="2.2.0", | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| manager = ConnectionManager() | |
| async def _on_startup(): | |
| """Nothing to start β HubTradeStore is in-memory, populated by WS messages.""" | |
| logger.info("π HubTradeStore ready (no background scanner needed)") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 5 β WEBSOCKET ENDPOINTS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def ws_publisher_endpoint(websocket: WebSocket, space_name: str): | |
| await manager.register_publisher(space_name, websocket) | |
| try: | |
| while True: | |
| raw_text = await websocket.receive_text() | |
| try: | |
| data = json.loads(raw_text) | |
| except json.JSONDecodeError: | |
| logger.warning(f"[{space_name}] Malformed JSON β skipped") | |
| continue | |
| msg_type = data.get("type", "") | |
| # ββ Track per-space message type counts (for /api/debug/hub) βββββββββ | |
| manager.record_msg(space_name, msg_type) | |
| # ββ Route by type ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if msg_type == "metrics": | |
| # Combined payload: top-level "training" and "voting" dicts | |
| await manager.ingest(space_name, { | |
| "training": data.get("training", {}), | |
| "voting": data.get("voting", {}), | |
| }) | |
| elif msg_type == "training": | |
| # Bug A fix: try "data" wrapper first, then fall back to top-level fields. | |
| # Some rankers send {"type":"training","data":{...}}, | |
| # others send {"type":"training","actor_loss":..., ...} directly. | |
| training_raw = data.get("data") or { | |
| manager._TRAINING_ALIAS.get(k, k): v | |
| for k, v in data.items() | |
| if k in manager._TRAINING_KEYS and k != "type" | |
| } | |
| if training_raw: | |
| logger.info( | |
| f"[{space_name}] β training msg | " | |
| f"keys={list(training_raw.keys())} | " | |
| f"actor_loss={training_raw.get('actor_loss', training_raw.get('a_loss', 'β'))}" | |
| ) | |
| await manager.ingest(space_name, {"training": training_raw, "voting": {}}) | |
| elif msg_type == "voting": | |
| voting_raw = data.get("data") or { | |
| k: v for k, v in data.items() | |
| if k in manager._VOTING_KEYS and k != "type" | |
| } | |
| await manager.ingest(space_name, {"training": {}, "voting": voting_raw}) | |
| elif msg_type in ("heartbeat", "identify", "ping"): | |
| pass | |
| elif msg_type == "trade_opened": | |
| # Executor space opened a trade β add to the hub's in-memory store. | |
| # data = {trade_id, asset, direction, entry, qty, opened_at} | |
| _hub_trades.open_trade(space_name, data.get("data", data)) | |
| elif msg_type == "trade_closed": | |
| # Executor space closed a trade β update the hub's in-memory store. | |
| # data = {trade_id, asset, pnl, exit_price, closed_at} | |
| _hub_trades.close_trade(space_name, data.get("data", data)) | |
| else: | |
| # Bug B fix: don't silently swallow. Try to rescue training/voting | |
| # fields that live at the top level of an unrecognised message type. | |
| rescued_training = { | |
| manager._TRAINING_ALIAS.get(k, k): v | |
| for k, v in data.items() | |
| if k in manager._TRAINING_KEYS | |
| } | |
| rescued_voting = { | |
| k: v for k, v in data.items() | |
| if k in manager._VOTING_KEYS | |
| } | |
| if rescued_training or rescued_voting: | |
| logger.warning( | |
| f"[{space_name}] β Unknown type='{msg_type}' β " | |
| f"auto-rescued: training_keys={list(rescued_training.keys())} " | |
| f"voting_keys={list(rescued_voting.keys())}" | |
| ) | |
| await manager.ingest(space_name, { | |
| "training": rescued_training, | |
| "voting": rescued_voting, | |
| }) | |
| else: | |
| logger.warning( | |
| f"[{space_name}] β Unknown type='{msg_type}' with no " | |
| f"extractable fields β dropped. Full keys: {list(data.keys())}" | |
| ) | |
| except WebSocketDisconnect: | |
| pass | |
| except Exception as e: | |
| logger.error(f"[{space_name}] Publisher error: {e}") | |
| finally: | |
| await manager.unregister_publisher(space_name) | |
| async def ws_subscriber_endpoint(websocket: WebSocket): | |
| await manager.register_subscriber(websocket) | |
| await manager.send_initial_state(websocket) | |
| try: | |
| while True: | |
| await websocket.receive_text() | |
| except WebSocketDisconnect: | |
| pass | |
| except Exception as e: | |
| logger.error(f"Subscriber error: {e}") | |
| finally: | |
| await manager.unregister_subscriber(websocket) | |
| async def ws_top3_endpoint(websocket: WebSocket): | |
| """ | |
| /ws/top3 β consumed by top3_client.py (MT5 bridge). | |
| Sends a top3_rankings message immediately on connect (replay of the latest | |
| known ranking so the client does not have to wait for the next ranker cycle), | |
| then keeps the socket open to receive subsequent broadcasts triggered by | |
| POST /api/axrvi/rankings. | |
| Message format: | |
| {"type": "top3_rankings", "rankings": [...], "total_assets": N, "hub_timestamp": T} | |
| """ | |
| await websocket.accept() | |
| async with _top3_clients_lock: | |
| _top3_clients.add(websocket) | |
| logger.info(f"π top3 client connected (total={len(_top3_clients)})") | |
| # ββ Replay latest rankings immediately so client doesn't wait up to 5 s βββ | |
| if _axrvi_rankings: | |
| try: | |
| await websocket.send_text(json.dumps({ | |
| "type": "top3_rankings", | |
| "rankings": _axrvi_rankings, | |
| "total_assets": len(_axrvi_rankings), | |
| "hub_timestamp": _axrvi_rankings_ts, | |
| })) | |
| except Exception: | |
| pass | |
| try: | |
| while True: | |
| await websocket.receive_text() # keep-alive β client sends nothing | |
| except WebSocketDisconnect: | |
| pass | |
| except Exception as e: | |
| logger.error(f"[top3] Client error: {e}") | |
| finally: | |
| async with _top3_clients_lock: | |
| _top3_clients.discard(websocket) | |
| logger.info(f"π top3 client disconnected (remaining={len(_top3_clients)})") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 6 β REST API (READ-ONLY) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_rankings(): | |
| return { | |
| "snapshots": manager.get_all_snapshots(), | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| } | |
| async def get_space_metrics(space_name: str): | |
| snap = manager.get_snapshot(space_name) | |
| if snap is None: | |
| return {"error": f"Unknown space: {space_name}"} | |
| return snap | |
| async def get_health(): | |
| return { | |
| "status": "ok", | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| **manager.get_health(), | |
| } | |
| async def api_debug_hub(): | |
| """ | |
| Diagnostic endpoint β exposes exactly what the hub has received and stored. | |
| Returns per-space: | |
| msg_counts β how many messages of each type arrived | |
| snapshot β current stored training + voting values | |
| history_len β number of history points recorded | |
| Use this to confirm whether training messages are arriving and being stored. | |
| If msg_counts shows training=0 for a space, the asset space is NOT sending | |
| training messages. If training > 0 but snapshot.training shows zeros, there | |
| is a field-name or format mismatch. | |
| """ | |
| snapshots = manager.get_all_snapshots() | |
| msg_counts = manager.get_msg_counts() | |
| history_len = {name: len(dq) for name, dq in manager._history.items()} | |
| spaces = {} | |
| for name in set(list(snapshots.keys()) + list(msg_counts.keys())): | |
| snap = snapshots.get(name, {}) | |
| spaces[name] = { | |
| "msg_counts": msg_counts.get(name, {}), | |
| "history_len": history_len.get(name, 0), | |
| "training": snap.get("training", {}), | |
| "voting": snap.get("voting", {}), | |
| "last_updated": snap.get("last_updated", 0), | |
| "stale_s": round(time.time() - snap.get("last_updated", time.time()), 1), | |
| } | |
| return JSONResponse({ | |
| "spaces": spaces, | |
| "total_ingested": manager._total_ingested, | |
| "publisher_count": len(manager._publishers), | |
| "subscriber_count": len(manager._subscribers), | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| }) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 7 β TRADE API (native β replaces patch_websocket_hub.py) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def api_trades(): | |
| """Full trade state: open trades, recent closed trades, summary stats.""" | |
| return JSONResponse(_hub_trades.get_state()) | |
| async def api_trades_open(): | |
| """Open trades only.""" | |
| state = _hub_trades.get_state() | |
| return JSONResponse({"open": state["open"]}) | |
| async def api_trades_closed(limit: int = 50): | |
| """Recent closed trades (newest first) + cumulative stats.""" | |
| state = _hub_trades.get_state() | |
| return JSONResponse({ | |
| "closed": state["closed"][:limit], | |
| "stats": state["stats"], | |
| }) | |
| async def api_health(): | |
| """Service health β includes live trade counts and log-file inventory.""" | |
| state = _hub_trades.get_state() | |
| return JSONResponse({ | |
| "service": "websocket_hub", | |
| "version": "v2.2-ranker-logs", | |
| "status": "running", | |
| "log_files": len(glob.glob(os.path.join(_LOG_DIR, "*.log"))), | |
| "trade_open": len(state["open"]), | |
| "trade_closed": len(state["closed"]), | |
| "uptime_seconds": round(time.time() - _START_TIME, 0), | |
| }) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 7b β RANKER LOGS API (FIX: moved here so routes live on port 7860) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # ROOT CAUSE of HTTP 404 on /api/ranker/logs/*: | |
| # - hub_dashboard_service.py (Flask) runs on port 8052 β NOT publicly accessible on HF Spaces | |
| # - websocket_hub.py (FastAPI/uvicorn) runs on port 7860 β the ONLY public port | |
| # - The browser fetches /api/ranker/logs/recent β hits port 7860 β no route β 404 | |
| # | |
| # FIX: FileBasedLoggerAdapter + all /api/ranker/logs/* routes added directly here. | |
| # The ranker writes logs to ./ranker_logs (= /app/ranker_logs). This adapter reads | |
| # those files directly β no dependency on hub_dashboard_service or in-memory ranker. | |
| _TRAINING_RE_HUB = re.compile( | |
| r'step=(\d+)\s*\|\s*loss=([\d.]+)\s*\|\s*lr=([\d.eE+\-]+)\s*\|\s*assets=(\d+)' | |
| ) | |
| _JSON_BLOB_RE_HUB = re.compile(r'(\{.*\})\s*$') | |
| def _enrich_training_entry(entry: dict) -> dict: | |
| """Attach parsed `data` dict to TRAINING entries so dashboard KPI cards populate.""" | |
| if entry.get("category", "").upper() != "TRAINING": | |
| return entry | |
| if entry.get("data"): | |
| return entry | |
| msg = entry.get("message", "") | |
| m = _TRAINING_RE_HUB.search(msg) | |
| if m: | |
| entry["data"] = { | |
| "step": int(m.group(1)), | |
| "loss": float(m.group(2)), | |
| "lr": float(m.group(3)), | |
| "asset_count": int(m.group(4)), | |
| } | |
| return entry | |
| jm = _JSON_BLOB_RE_HUB.search(msg) | |
| if jm: | |
| try: | |
| blob = json.loads(jm.group(1)) | |
| if "step" in blob: | |
| entry["data"] = { | |
| "step": blob.get("step", 0), | |
| "loss": blob.get("loss", 0.0), | |
| "lr": blob.get("lr", 0.0), | |
| "asset_count": blob.get("asset_count", blob.get("assets", 0)), | |
| } | |
| except (ValueError, KeyError): | |
| pass | |
| return entry | |
| class FileBasedLoggerAdapter: | |
| """ | |
| Reads ranker log files from disk and exposes the RankerLogger interface | |
| expected by the /api/ranker/logs/* endpoints. | |
| No in-memory ranker process required. | |
| """ | |
| _CAT_RE = re.compile(r'\|\s*(INFO|DEBUG|WARNING|ERROR|CRITICAL)\s*\|\s*([A-Z_]+)\s*\|') | |
| _ASSET_RE = re.compile(r'\|\s*(?:TRADE|SIGNAL)\s*\|\s*(\w+)\s*\|') | |
| _TS_RE = re.compile(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]') | |
| def __init__(self, log_dir: str): | |
| self._log_dir = log_dir | |
| self._lock = threading.RLock() | |
| def _find_files(self) -> list: | |
| candidate_dirs = [ | |
| self._log_dir, | |
| "/app/ranker_logs", | |
| str(Path(__file__).parent / "ranker_logs"), | |
| "./ranker_logs", | |
| "/home/user/ranker_logs", | |
| "/tmp/ranker_logs", | |
| ] | |
| all_files: list = [] | |
| seen: set = set() | |
| for d in candidate_dirs: | |
| for f in sorted(glob.glob(str(Path(d) / "*.log*"))): | |
| if f not in seen: | |
| seen.add(f) | |
| all_files.append(f) | |
| return all_files | |
| def _read_lines(self, n_tail: int = 500) -> list: | |
| files = self._find_files() | |
| raw: list = [] | |
| for fpath in files[-3:]: | |
| try: | |
| with open(fpath, "r", encoding="utf-8", errors="replace") as f: | |
| raw.extend(f.readlines()[-n_tail:]) | |
| except OSError: | |
| pass | |
| raw.reverse() # newest first | |
| return raw | |
| def _line_to_entry(self, line: str) -> Optional[dict]: | |
| ts_m = self._TS_RE.search(line) | |
| if not ts_m: | |
| return None | |
| cat_m = self._CAT_RE.search(line) | |
| level = cat_m.group(1) if cat_m else "INFO" | |
| cat = cat_m.group(2).strip() if cat_m else "" | |
| ast_m = self._ASSET_RE.search(line) | |
| asset = ast_m.group(1) if ast_m else None | |
| return { | |
| "timestamp": ts_m.group(1), | |
| "level": level, | |
| "category": cat, | |
| "message": line.strip(), | |
| "asset": asset, | |
| "data": None, | |
| } | |
| def get_recent(self, n: int = 50, category: Optional[str] = None) -> list: | |
| entries: list = [] | |
| for line in self._read_lines(n_tail=max(n * 3, 200)): | |
| e = self._line_to_entry(line) | |
| if e is None: | |
| continue | |
| if category and category.upper() not in line.upper(): | |
| continue | |
| entries.append(e) | |
| if len(entries) >= n: | |
| break | |
| return entries | |
| def get_by_asset(self, asset: str, n: int = 30) -> list: | |
| entries: list = [] | |
| for line in self._read_lines(n_tail=500): | |
| if asset.upper() not in line.upper(): | |
| continue | |
| e = self._line_to_entry(line) | |
| if e: | |
| entries.append(e) | |
| if len(entries) >= n: | |
| break | |
| return entries | |
| def get_by_level(self, level: str, n: int = 50) -> list: | |
| entries: list = [] | |
| for line in self._read_lines(n_tail=500): | |
| e = self._line_to_entry(line) | |
| if e and e["level"].upper() == level.upper(): | |
| entries.append(e) | |
| if len(entries) >= n: | |
| break | |
| return entries | |
| def get_stats(self) -> dict: | |
| by_category: dict = {} | |
| by_level: dict = {} | |
| by_asset: dict = {} | |
| errors: dict = {} | |
| total: int = 0 | |
| for line in self._read_lines(n_tail=2000): | |
| e = self._line_to_entry(line) | |
| if not e: | |
| continue | |
| total += 1 | |
| by_level[e["level"]] = by_level.get(e["level"], 0) + 1 | |
| by_category[e["category"]] = by_category.get(e["category"], 0) + 1 | |
| if e["asset"]: | |
| by_asset[e["asset"]] = by_asset.get(e["asset"], 0) + 1 | |
| if e["level"] in ("ERROR", "CRITICAL"): | |
| errors[e["category"]] = errors.get(e["category"], 0) + 1 | |
| return { | |
| "total_events": total, | |
| "by_level": by_level, | |
| "by_category": by_category, | |
| "by_asset": by_asset, | |
| "errors": errors, | |
| "buffer_size": total, | |
| "buffer_capacity": total, | |
| } | |
| def export_json(self, filepath: str, n: int = 500) -> None: | |
| entries = self.get_recent(n) | |
| with open(filepath, "w") as f: | |
| json.dump({ | |
| "export_time": datetime.utcnow().isoformat(), | |
| "count": len(entries), | |
| "logs": entries, | |
| }, f, indent=2) | |
| def clear_buffer(self) -> None: | |
| pass # file-based β nothing to clear | |
| # Singleton adapter β reads from the same /app/ranker_logs the ranker writes to | |
| _log_adapter = FileBasedLoggerAdapter(log_dir=_LOG_DIR) | |
| async def api_ranker_logs_recent(limit: int = 50, category: Optional[str] = None): | |
| """GET /api/ranker/logs/recent?limit=80&category=TRAINING""" | |
| try: | |
| entries = _log_adapter.get_recent(n=limit, category=category) | |
| entries = [_enrich_training_entry(e) for e in entries] | |
| return JSONResponse({ | |
| "logs": entries, | |
| "count": len(entries), | |
| "stats": _log_adapter.get_stats(), | |
| }) | |
| except Exception as exc: | |
| logger.exception(f"[api_ranker_logs_recent] {exc}") | |
| return JSONResponse({"logs": [], "count": 0, "error": str(exc)}, status_code=200) | |
| async def api_ranker_logs_stats(): | |
| """GET /api/ranker/logs/stats""" | |
| try: | |
| return JSONResponse(_log_adapter.get_stats()) | |
| except Exception as exc: | |
| return JSONResponse({"error": str(exc)}, status_code=500) | |
| async def api_ranker_logs_asset(asset: str, limit: int = 30): | |
| """GET /api/ranker/logs/asset/V75?limit=30""" | |
| try: | |
| entries = _log_adapter.get_by_asset(asset, n=limit) | |
| return JSONResponse({"asset": asset, "logs": entries, "count": len(entries)}) | |
| except Exception as exc: | |
| return JSONResponse({"asset": asset, "logs": [], "count": 0, "error": str(exc)}) | |
| async def api_ranker_logs_level(level: str, limit: int = 50): | |
| """GET /api/ranker/logs/level/ERROR?limit=50""" | |
| try: | |
| entries = _log_adapter.get_by_level(level, n=limit) | |
| return JSONResponse({"level": level.upper(), "logs": entries, "count": len(entries)}) | |
| except Exception as exc: | |
| return JSONResponse({"level": level.upper(), "logs": [], "count": 0, "error": str(exc)}) | |
| async def api_ranker_logs_export(limit: int = 500): | |
| """GET /api/ranker/logs/export β download JSON""" | |
| from fastapi.responses import FileResponse as _FileResponse | |
| try: | |
| export_path = "/tmp/ranker_logs_export.json" | |
| _log_adapter.export_json(export_path, n=limit) | |
| return _FileResponse( | |
| export_path, | |
| media_type="application/json", | |
| filename="ranker_logs_export.json", | |
| ) | |
| except Exception as exc: | |
| return JSONResponse({"error": str(exc)}, status_code=500) | |
| async def api_ranker_logs_clear(): | |
| """POST /api/ranker/logs/clear""" | |
| try: | |
| _log_adapter.clear_buffer() | |
| return JSONResponse({"status": "cleared"}) | |
| except Exception as exc: | |
| return JSONResponse({"error": str(exc)}, status_code=500) | |
| async def api_ranker_logs_debug(): | |
| """GET /api/ranker/logs/debug β show which log files are found""" | |
| files = _log_adapter._find_files() | |
| return JSONResponse({ | |
| "log_dir": _LOG_DIR, | |
| "files_found": files, | |
| "file_count": len(files), | |
| "stats": _log_adapter.get_stats(), | |
| }) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 8 β DASHBOARD UI ROUTES | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _HTML_PATH = Path(os.environ.get( | |
| "DASHBOARD_HTML", | |
| Path(__file__).parent / "hub_dashboard.html", | |
| )) | |
| def _compute_rankings() -> List[dict]: | |
| """ | |
| Build the rankings list served by /api/state. | |
| Priority order: | |
| 1. Live AXRVI rankings pushed by the Executo ranker via | |
| POST /api/axrvi/rankings (within the last 30 s). | |
| These contain real softmax-Shreve priorities from AXRVINet. | |
| 2. Fallback: hub-snapshot vote-ratio scoring used before the | |
| ranker connects or if the push is stale (e.g. ranker restart). | |
| """ | |
| global _axrvi_rankings, _axrvi_rankings_ts | |
| # ββ Path 1: fresh AXRVI rankings ββββββββββββββββββββββββββββββββββββββββ | |
| if _axrvi_rankings and (time.time() - _axrvi_rankings_ts) < _AXRVI_RANKINGS_TTL: | |
| snapshots = manager.get_all_snapshots() | |
| merged: List[dict] = [] | |
| for r in _axrvi_rankings: | |
| name = r.get("space_name", "") | |
| snap = snapshots.get(name, {}) | |
| training = snap.get("training", {}) | |
| voting = snap.get("voting", {}) | |
| buy = voting.get("buy_count", r.get("buy_count", 0)) | |
| sell = voting.get("sell_count", r.get("sell_count", 0)) | |
| merged.append({ | |
| # Core AXRVI fields β these are the live ranker values | |
| "rank": r.get("rank", 0), | |
| "space_name": name, | |
| "score": r.get("score", 0.0), | |
| "final_priority": r.get("final_priority", r.get("score", 0.0)), | |
| "signal_confidence": r.get("signal_confidence",0.0), | |
| "dominant_signal": r.get("dominant_signal", "NEUTRAL"), | |
| "avn_accuracy": r.get("avn_accuracy", 0.0), | |
| "epistemic_std": r.get("epistemic_std", 0.0), | |
| "training_steps": r.get("training_steps", training.get("training_steps", 0)), | |
| # Hub-snapshot fields merged in (latest available) | |
| "actor_loss": training.get("actor_loss", 0.0), | |
| "critic_loss": training.get("critic_loss", 0.0), | |
| "avn_loss": training.get("avn_loss", 0.0), | |
| "buy_count": buy, | |
| "sell_count": sell, | |
| "last_updated": snap.get("last_updated", _axrvi_rankings_ts), | |
| }) | |
| return merged | |
| # ββ Path 2: fallback hub-snapshot scoring ββββββββββββββββββββββββββββββββ | |
| ranked: List[dict] = [] | |
| for name, snap in manager.get_all_snapshots().items(): | |
| training = snap.get("training", {}) | |
| voting = snap.get("voting", {}) | |
| buy = voting.get("buy_count", 0) | |
| sell = voting.get("sell_count", 0) | |
| total = buy + sell | |
| sig_conf = (max(buy, sell) / total) if total > 0 else 0.0 | |
| avn_acc = training.get("avn_accuracy", 0.0) | |
| score = round(sig_conf - avn_acc, 6) | |
| ranked.append({ | |
| "rank": 0, | |
| "space_name": name, | |
| "score": score, | |
| "final_priority": score, | |
| "signal_confidence": round(sig_conf, 6), | |
| "avn_accuracy": round(avn_acc, 6), | |
| "dominant_signal": voting.get("dominant_signal", "NEUTRAL"), | |
| "buy_count": buy, | |
| "sell_count": sell, | |
| "training_steps": training.get("training_steps", 0), | |
| "actor_loss": training.get("actor_loss", 0.0), | |
| "critic_loss": training.get("critic_loss", 0.0), | |
| "avn_loss": training.get("avn_loss", 0.0), | |
| "last_updated": snap.get("last_updated", 0.0), | |
| "epistemic_std": 0.0, | |
| }) | |
| ranked.sort(key=lambda r: r["score"], reverse=True) | |
| for i, r in enumerate(ranked): | |
| r["rank"] = i + 1 | |
| return ranked | |
| async def serve_dashboard(): | |
| if _HTML_PATH.exists(): | |
| return FileResponse(str(_HTML_PATH), media_type="text/html") | |
| return JSONResponse( | |
| status_code=200, | |
| content={ | |
| "service": "K1RL QUASAR Hub", | |
| "status": "running", | |
| "note": "hub_dashboard.html not found β upload it to the Space", | |
| "expected": str(_HTML_PATH), | |
| "endpoints": [ | |
| "/rankings", "/health", | |
| "/api/state", "/api/trades", "/api/trades/open", | |
| "/api/trades/closed", "/api/health", | |
| "/ws/publish/{space}", "/ws/subscribe", | |
| ], | |
| }, | |
| ) | |
| async def receive_axrvi_rankings(request: Request): | |
| """ | |
| Called by the Executo ranker after every rank_and_gate() cycle (~5 s). | |
| Stores the live AXRVI-scored ranking list so _compute_rankings() can serve | |
| it from /api/state instead of the stale hub-snapshot vote-ratio fallback. | |
| Expected body: | |
| {"rankings": [{"space_name": "V75", "score": 0.24, "rank": 1, ...}, ...]} | |
| """ | |
| global _axrvi_rankings, _axrvi_rankings_ts | |
| try: | |
| body = await request.json() | |
| except Exception as e: | |
| return JSONResponse({"ok": False, "error": f"Bad JSON: {e}"}, status_code=400) | |
| rankings = body.get("rankings", []) | |
| if not isinstance(rankings, list): | |
| return JSONResponse({"ok": False, "error": "rankings must be a list"}, status_code=400) | |
| _axrvi_rankings = rankings | |
| _axrvi_rankings_ts = time.time() | |
| logger.debug( | |
| f"[AXRVI Rankings] Received {len(rankings)} assets | " | |
| f"top={rankings[0].get('space_name','?')} score={rankings[0].get('score',0):.4f}" | |
| if rankings else "[AXRVI Rankings] Received empty list" | |
| ) | |
| # Broadcast to all connected top3_client.py instances immediately | |
| if rankings: | |
| asyncio.create_task(_broadcast_top3_rankings(rankings)) | |
| return JSONResponse({"ok": True, "count": len(rankings), "ts": _axrvi_rankings_ts}) | |
| async def api_state(): | |
| """Full dashboard state polled by hub_dashboard.html every 2 s.""" | |
| rankings = _compute_rankings() | |
| return JSONResponse({ | |
| "rankings": rankings, | |
| "metric_history": manager.get_metric_history(), | |
| "health": { | |
| "hub_connected": True, | |
| "spaces_connected": len(manager.get_all_snapshots()), | |
| "messages_rx": manager._total_ingested, | |
| "last_update_ts": max( | |
| (s.get("last_updated", 0) for s in manager.get_all_snapshots().values()), | |
| default=0.0, | |
| ), | |
| "last_update_ago": round( | |
| time.time() - max( | |
| (s.get("last_updated", 0) for s in manager.get_all_snapshots().values()), | |
| default=time.time(), | |
| ), 1 | |
| ), | |
| "uptime_seconds": round(time.time() - _START_TIME, 0), | |
| "reconnect_count": 0, | |
| }, | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| }) | |
| _START_TIME = time.time() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 9 β ENTRY POINT | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| logger.info(f"π QUASAR Hub starting on port {port}") | |
| uvicorn.run(app, host="0.0.0.0", port=port, log_level="info") |