Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β QUASAR RANKER β COMPLETE LOGGING SYSTEM β | |
| β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β | |
| β Provides file-based logging with JSON export, in-memory buffer, and REST API. β | |
| β VERSION: v2.1 | 2026-03-30 β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| import time | |
| from collections import deque, defaultdict | |
| from dataclasses import dataclass # <-- MISSING IMPORT! | |
| from datetime import datetime | |
| from enum import Enum, auto | |
| from pathlib import Path | |
| from threading import Lock | |
| from typing import Dict, List, Optional, Any | |
| class LogLevel(Enum): | |
| DEBUG = auto() | |
| INFO = auto() | |
| WARNING = auto() | |
| ERROR = auto() | |
| CRITICAL = auto() | |
| class EventCategory(Enum): | |
| INITIALIZATION = 'INITIALIZATION' | |
| PROCESSING = 'PROCESSING' | |
| TERMINATION = 'TERMINATION' | |
| ERROR_OCCURRED = 'ERROR_OCCURRED' | |
| CONNECTION = 'CONNECTION' | |
| TRAINING = 'TRAINING' | |
| SIGNAL = 'SIGNAL' | |
| TRADE = 'TRADE' | |
| RANKING = 'RANKING' | |
| class LogEntry: | |
| """Structured log entry with all metadata.""" | |
| timestamp: float | |
| level: str | |
| category: str | |
| message: str | |
| asset: Optional[str] = None | |
| metadata: Optional[Dict] = None | |
| def to_dict(self) -> dict: | |
| return { | |
| "ts": self.timestamp, | |
| "timestamp": datetime.fromtimestamp(self.timestamp).isoformat(), | |
| "level": self.level, | |
| "category": self.category, | |
| "message": self.message, | |
| "asset": self.asset, | |
| "metadata": self.metadata or {}, | |
| } | |
| def to_file_line(self) -> str: | |
| """Format for file logging (human-readable).""" | |
| dt = datetime.fromtimestamp(self.timestamp).strftime("%Y-%m-%d %H:%M:%S") | |
| asset_str = f" | {self.asset}" if self.asset else "" | |
| meta_str = f" | {json.dumps(self.metadata)}" if self.metadata else "" | |
| return f"[{dt}] | {self.level:8s} | {self.category:15s}{asset_str} | {self.message}{meta_str}" | |
| class RankerLogger: | |
| """ | |
| Complete logger with file output, in-memory buffer, and JSON export. | |
| Features: | |
| - Writes to both console and rotating log files | |
| - Maintains in-memory buffer for API queries | |
| - Per-asset and per-category indexing | |
| - JSON export for persistence | |
| - Thread-safe with locks | |
| """ | |
| def __init__( | |
| self, | |
| name: str = "QuasarAXRVI", | |
| buffer_size: int = 1000, | |
| log_dir: str = "./ranker_logs", | |
| on_event: Optional[callable] = None, | |
| ): | |
| self.name = name | |
| self.buffer_size = buffer_size | |
| self.log_dir = Path(log_dir) | |
| self.on_event = on_event | |
| # Create log directory if it doesn't exist | |
| self.log_dir.mkdir(parents=True, exist_ok=True) | |
| # In-memory buffers | |
| self._buffer: deque = deque(maxlen=buffer_size) | |
| self._by_asset: Dict[str, deque] = defaultdict(lambda: deque(maxlen=buffer_size // 2)) | |
| self._by_level: Dict[str, deque] = defaultdict(lambda: deque(maxlen=buffer_size // 2)) | |
| self._by_category: Dict[str, deque] = defaultdict(lambda: deque(maxlen=buffer_size // 2)) | |
| self._lock = Lock() | |
| # File handler for persistent logging | |
| self._setup_file_logging() | |
| # Console logging | |
| self.console_logger = logging.getLogger(name) | |
| self.console_logger.setLevel(logging.DEBUG) | |
| if not self.console_logger.handlers: | |
| ch = logging.StreamHandler(sys.stdout) | |
| ch.setLevel(logging.DEBUG) | |
| formatter = logging.Formatter('%(message)s') | |
| ch.setFormatter(formatter) | |
| self.console_logger.addHandler(ch) | |
| self.stats = { | |
| "total_events": 0, | |
| "by_level": defaultdict(int), | |
| "by_category": defaultdict(int), | |
| "by_asset": defaultdict(int), | |
| "errors": defaultdict(int), | |
| } | |
| self._log(LogLevel.INFO, EventCategory.INITIALIZATION, | |
| f"RankerLogger initialized | log_dir={log_dir} | buffer_size={buffer_size}") | |
| def _setup_file_logging(self): | |
| """Setup rotating file logging.""" | |
| log_file = self.log_dir / f"{self.name}_{datetime.now().strftime('%Y%m%d')}.log" | |
| file_handler = logging.FileHandler(log_file, encoding='utf-8') | |
| file_handler.setLevel(logging.DEBUG) | |
| file_formatter = logging.Formatter('%(message)s') | |
| file_handler.setFormatter(file_formatter) | |
| self.file_logger = logging.getLogger(f"{self.name}_file") | |
| self.file_logger.setLevel(logging.DEBUG) | |
| self.file_logger.addHandler(file_handler) | |
| self.file_logger.propagate = False | |
| def _log( | |
| self, | |
| level: LogLevel, | |
| category: EventCategory, | |
| message: str, | |
| asset: Optional[str] = None, | |
| metadata: Optional[Dict] = None, | |
| ): | |
| """Internal logging method.""" | |
| entry = LogEntry( | |
| timestamp=time.time(), | |
| level=level.name, | |
| category=category.value, | |
| message=message, | |
| asset=asset, | |
| metadata=metadata, | |
| ) | |
| # Write to file | |
| file_line = entry.to_file_line() | |
| self.file_logger.info(file_line) | |
| # Write to console (simplified) | |
| console_line = f"{entry.level:8s} | {entry.category:15s}" | |
| if asset: | |
| console_line += f" | {asset}" | |
| console_line += f" | {message}" | |
| if level == LogLevel.DEBUG: | |
| self.console_logger.debug(console_line) | |
| elif level == LogLevel.INFO: | |
| self.console_logger.info(console_line) | |
| elif level == LogLevel.WARNING: | |
| self.console_logger.warning(console_line) | |
| elif level == LogLevel.ERROR: | |
| self.console_logger.error(console_line) | |
| else: | |
| self.console_logger.critical(console_line) | |
| # Store in memory buffers | |
| with self._lock: | |
| self._buffer.append(entry) | |
| if asset: | |
| self._by_asset[asset].append(entry) | |
| self._by_level[level.name].append(entry) | |
| self._by_category[category.value].append(entry) | |
| self.stats["total_events"] += 1 | |
| self.stats["by_level"][level.name] += 1 | |
| self.stats["by_category"][category.value] += 1 | |
| if asset: | |
| self.stats["by_asset"][asset] += 1 | |
| if level == LogLevel.ERROR or level == LogLevel.CRITICAL: | |
| self.stats["errors"][category.value] += 1 | |
| # Callback for external consumers (e.g., dashboard) | |
| if self.on_event: | |
| try: | |
| self.on_event(entry.to_dict()) | |
| except Exception: | |
| pass | |
| # ββ Public logging methods (called by ranker) ββββββββββββββββββββββββββββββββ | |
| def connection_event(self, component: str, status: str, details: str = ""): | |
| """Log WebSocket or Hub connection events.""" | |
| msg = f"{component} | {status}" | |
| if details: | |
| msg += f" | {details}" | |
| self._log(LogLevel.INFO if status in ["connected", "ready"] else LogLevel.WARNING, | |
| EventCategory.CONNECTION, msg) | |
| def training_update(self, step: int, loss: float, lr: float, asset_count: int = 0): | |
| """Log training progress.""" | |
| metadata = {"step": step, "loss": loss, "lr": lr, "asset_count": asset_count} | |
| self._log(LogLevel.DEBUG, EventCategory.TRAINING, | |
| f"step={step} | loss={loss:.4f} | lr={lr:.6f} | assets={asset_count}", | |
| metadata=metadata) | |
| def hub_update(self, asset: str, avn_accuracy: float, signal_confidence: float): | |
| """Log hub snapshot updates.""" | |
| metadata = {"avn_accuracy": avn_accuracy, "signal_confidence": signal_confidence} | |
| self._log(LogLevel.DEBUG, EventCategory.PROCESSING, | |
| f"hub update | acc={avn_accuracy:.3f} | conf={signal_confidence:.3f}", | |
| asset=asset, metadata=metadata) | |
| def signal(self, asset: str, direction: str, confidence: float, significance: float): | |
| """Log signal generation.""" | |
| metadata = {"direction": direction, "confidence": confidence, "significance": significance} | |
| self._log(LogLevel.INFO, EventCategory.SIGNAL, | |
| f"{direction} | conf={confidence:.3f} | sig={significance:.3f}", | |
| asset=asset, metadata=metadata) | |
| def trade_open(self, trade_id: str, asset: str, direction: str, price: float, qty: float): | |
| """Log trade opening.""" | |
| metadata = {"trade_id": trade_id, "price": price, "qty": qty, "direction": direction} | |
| self._log(LogLevel.INFO, EventCategory.TRADE, | |
| f"TRADE OPENED | ID={trade_id} | Dir={direction} | Entry={price:.4f} | Qty={qty:.6f}", | |
| asset=asset, metadata=metadata) | |
| def trade_close(self, trade_id: str, asset: str, pnl: float, return_pct: float, exit_price: Optional[float] = None): | |
| """Log trade closing. β FIX v2.2: exit_price written into message text AND metadata.""" | |
| metadata = {"trade_id": trade_id, "pnl": pnl, "return_pct": return_pct} | |
| # Include exit_price in metadata for JSON export | |
| if exit_price is not None: | |
| metadata["exit_price"] = exit_price | |
| # β FIX v2.2: Also embed exit_price in the pipe-delimited message so the | |
| # dashboard regex (TRADE_CLOSE_RE_WITH_EXIT) can capture it directly. | |
| # Previously exit_price lived only in the trailing JSON metadata, which the | |
| # regex never reached β causing the EXIT column to always display "β". | |
| msg = f"TRADE CLOSED | ID={trade_id} | pnl={pnl:+.4f} | return={return_pct:+.2%}" | |
| if exit_price is not None: | |
| msg += f" | exit_price={exit_price}" | |
| self._log(LogLevel.INFO, EventCategory.TRADE, msg, asset=asset, metadata=metadata) | |
| def ranking_update(self, rankings: List[Dict], top_asset: str, top_score: float): | |
| """Log ranking cycle results.""" | |
| metadata = {"top_asset": top_asset, "top_score": top_score, "num_ranked": len(rankings)} | |
| self._log(LogLevel.DEBUG, EventCategory.RANKING, | |
| f"rankings | top={top_asset} | score={top_score:.4f} | total={len(rankings)}", | |
| metadata=metadata) | |
| # ββ Generic log method (backward compatibility) βββββββββββββββββββββββββββββ | |
| def log(self, level: LogLevel, category: EventCategory, message: str, asset: Optional[str] = None): | |
| """Generic log method for custom events.""" | |
| self._log(level, category, message, asset=asset) | |
| # ββ API methods for dashboard (required by hub_dashboard_service) βββββββββββ | |
| def get_recent(self, n: int = 50, category: Optional[str] = None) -> List[dict]: | |
| """Get most recent n log entries, optionally filtered by category.""" | |
| with self._lock: | |
| if category: | |
| entries = list(self._by_category.get(category.upper(), [])) | |
| else: | |
| entries = list(self._buffer) | |
| return [e.to_dict() for e in entries[-n:]] | |
| def get_by_asset(self, asset: str, n: int = 30) -> List[dict]: | |
| """Get recent logs for a specific asset.""" | |
| with self._lock: | |
| entries = list(self._by_asset.get(asset, [])) | |
| return [e.to_dict() for e in entries[-n:]] | |
| def get_by_level(self, level: str, n: int = 50) -> List[dict]: | |
| """Get recent logs by log level.""" | |
| with self._lock: | |
| entries = list(self._by_level.get(level.upper(), [])) | |
| return [e.to_dict() for e in entries[-n:]] | |
| def get_stats(self) -> dict: | |
| """Get logging statistics.""" | |
| with self._lock: | |
| return { | |
| "total_events": self.stats["total_events"], | |
| "by_level": dict(self.stats["by_level"]), | |
| "by_category": dict(self.stats["by_category"]), | |
| "by_asset": dict(self.stats["by_asset"]), | |
| "errors": dict(self.stats["errors"]), | |
| "buffer_size": len(self._buffer), | |
| "buffer_capacity": self.buffer_size, | |
| } | |
| def clear_buffer(self): | |
| """Clear in-memory buffer.""" | |
| with self._lock: | |
| self._buffer.clear() | |
| self._by_asset.clear() | |
| self._by_level.clear() | |
| self._by_category.clear() | |
| self.stats = { | |
| "total_events": 0, | |
| "by_level": defaultdict(int), | |
| "by_category": defaultdict(int), | |
| "by_asset": defaultdict(int), | |
| "errors": defaultdict(int), | |
| } | |
| def export_json(self, filepath: str, n: int = 500): | |
| """Export logs to JSON file.""" | |
| entries = self.get_recent(n) | |
| with open(filepath, 'w') as f: | |
| json.dump({ | |
| "export_time": datetime.now().isoformat(), | |
| "count": len(entries), | |
| "logs": entries | |
| }, f, indent=2) | |
| def make_trade_ws_hook(ws_send_fn): | |
| """ | |
| Factory for the on_event callback that forwards TRADE log entries to the | |
| Executo Hub via WebSocket. | |
| Usage in your executor space ranker: | |
| import asyncio, json | |
| from ranker_logging import RankerLogger | |
| async def connect_to_hub(): | |
| # ws = your websockets/websocket-client connection to the hub | |
| ranker_logger = RankerLogger( | |
| name="QuasarAXRVI_V75", | |
| on_event=RankerLogger.make_trade_ws_hook( | |
| lambda payload: asyncio.create_task(ws.send(json.dumps(payload))) | |
| ) | |
| ) | |
| Args: | |
| ws_send_fn: callable(dict) β sends a dict as JSON to the hub WS publisher. | |
| Can be a coroutine wrapper or sync function. | |
| """ | |
| def _hook(entry: dict) -> None: | |
| if entry.get("category") != "TRADE": | |
| return | |
| msg = entry.get("message", "") | |
| meta = entry.get("metadata") or {} | |
| ts = entry.get("timestamp", "") | |
| if "TRADE OPENED" in msg: | |
| payload = { | |
| "type": "trade_opened", | |
| "data": { | |
| "trade_id": meta.get("trade_id"), | |
| "asset": entry.get("asset"), | |
| "direction": meta.get("direction", "?"), | |
| "entry": meta.get("price"), | |
| "qty": meta.get("qty", 0.0), | |
| "opened_at": ts, | |
| }, | |
| } | |
| elif "TRADE CLOSED" in msg: | |
| payload = { | |
| "type": "trade_closed", | |
| "data": { | |
| "trade_id": meta.get("trade_id"), | |
| "asset": entry.get("asset"), | |
| "pnl": meta.get("pnl", 0.0), | |
| "exit_price": meta.get("exit_price"), | |
| "closed_at": ts, | |
| }, | |
| } | |
| else: | |
| return | |
| try: | |
| ws_send_fn(payload) | |
| except Exception: | |
| pass # never let a WS error crash the ranker | |
| return _hook | |
| class RankerLogBridge: | |
| """ | |
| Bridge between ranker components and the logging system. | |
| Provides convenience methods for common logging patterns. | |
| """ | |
| def __init__(self, ranker_logger: RankerLogger): | |
| self.logger = ranker_logger | |
| def capture_signal(self, asset: str, buffer, score: float): | |
| """Capture signal generation from asset buffer.""" | |
| if buffer and hasattr(buffer, 'latest_signal') and buffer.latest_signal: | |
| action = buffer.latest_signal.get("action", "HOLD") | |
| confidence = buffer.latest_signal.get("confidence", 0.0) | |
| self.logger.signal(asset, action, confidence, score) | |
| def capture_ranking(self, ranked: List, hub_snapshots: Dict): | |
| """Capture ranking results.""" | |
| if ranked: | |
| top = ranked[0] | |
| # Handle both dict and object types | |
| top_name = top.space_name if hasattr(top, 'space_name') else top.get('space_name', '') | |
| top_score = top.final_priority if hasattr(top, 'final_priority') else top.get('final_priority', 0) | |
| self.logger.ranking_update( | |
| [r.space_name if hasattr(r, 'space_name') else r.get('space_name', '') for r in ranked[:5]], | |
| top_name, | |
| top_score | |
| ) | |
| # Cache full ranked list so get_stats() can expose it | |
| self._last_rankings = [ | |
| { | |
| "space_name": r.space_name if hasattr(r, 'space_name') else r.get('space_name', ''), | |
| "score": r.final_priority if hasattr(r, 'final_priority') else r.get('final_priority', 0), | |
| "rank": r.rank if hasattr(r, 'rank') else r.get('rank', 0), | |
| "dominant_signal": r.dominant_signal if hasattr(r, 'dominant_signal') else r.get('dominant_signal', 'NEUTRAL'), | |
| "epistemic_std": r.epistemic_std if hasattr(r, 'epistemic_std') else r.get('epistemic_std', 0.0), | |
| } | |
| for r in ranked | |
| ] | |
| def log_event(self, level: LogLevel, category: EventCategory, message: str, asset: Optional[str] = None): | |
| """Generic event logging.""" | |
| self.logger.log(level, category, message, asset=asset) |