#!/usr/bin/env python3 """ ╔══════════════════════════════════════════════════════════════════════════════════════╗ ║ QUASAR RANKER — COMPLETE LOGGING SYSTEM ║ ║ ────────────────────────────────────────────────────────────────────────────────── ║ ║ Provides file-based logging with JSON export, in-memory buffer, and REST API. ║ ║ VERSION: v2.1 | 2026-03-30 ║ ╚══════════════════════════════════════════════════════════════════════════════════════╝ """ import json import logging import os import sys import time from collections import deque, defaultdict from dataclasses import dataclass # <-- MISSING IMPORT! from datetime import datetime from enum import Enum, auto from pathlib import Path from threading import Lock from typing import Dict, List, Optional, Any class LogLevel(Enum): DEBUG = auto() INFO = auto() WARNING = auto() ERROR = auto() CRITICAL = auto() class EventCategory(Enum): INITIALIZATION = 'INITIALIZATION' PROCESSING = 'PROCESSING' TERMINATION = 'TERMINATION' ERROR_OCCURRED = 'ERROR_OCCURRED' CONNECTION = 'CONNECTION' TRAINING = 'TRAINING' SIGNAL = 'SIGNAL' TRADE = 'TRADE' RANKING = 'RANKING' @dataclass class LogEntry: """Structured log entry with all metadata.""" timestamp: float level: str category: str message: str asset: Optional[str] = None metadata: Optional[Dict] = None def to_dict(self) -> dict: return { "ts": self.timestamp, "timestamp": datetime.fromtimestamp(self.timestamp).isoformat(), "level": self.level, "category": self.category, "message": self.message, "asset": self.asset, "metadata": self.metadata or {}, } def to_file_line(self) -> str: """Format for file logging (human-readable).""" dt = datetime.fromtimestamp(self.timestamp).strftime("%Y-%m-%d %H:%M:%S") asset_str = f" | {self.asset}" if self.asset else "" meta_str = f" | {json.dumps(self.metadata)}" if self.metadata else "" return f"[{dt}] | {self.level:8s} | {self.category:15s}{asset_str} | {self.message}{meta_str}" class RankerLogger: """ Complete logger with file output, in-memory buffer, and JSON export. Features: - Writes to both console and rotating log files - Maintains in-memory buffer for API queries - Per-asset and per-category indexing - JSON export for persistence - Thread-safe with locks """ def __init__( self, name: str = "QuasarAXRVI", buffer_size: int = 1000, log_dir: str = "./ranker_logs", on_event: Optional[callable] = None, ): self.name = name self.buffer_size = buffer_size self.log_dir = Path(log_dir) self.on_event = on_event # Create log directory if it doesn't exist self.log_dir.mkdir(parents=True, exist_ok=True) # In-memory buffers self._buffer: deque = deque(maxlen=buffer_size) self._by_asset: Dict[str, deque] = defaultdict(lambda: deque(maxlen=buffer_size // 2)) self._by_level: Dict[str, deque] = defaultdict(lambda: deque(maxlen=buffer_size // 2)) self._by_category: Dict[str, deque] = defaultdict(lambda: deque(maxlen=buffer_size // 2)) self._lock = Lock() # File handler for persistent logging self._setup_file_logging() # Console logging self.console_logger = logging.getLogger(name) self.console_logger.setLevel(logging.DEBUG) if not self.console_logger.handlers: ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) formatter = logging.Formatter('%(message)s') ch.setFormatter(formatter) self.console_logger.addHandler(ch) self.stats = { "total_events": 0, "by_level": defaultdict(int), "by_category": defaultdict(int), "by_asset": defaultdict(int), "errors": defaultdict(int), } self._log(LogLevel.INFO, EventCategory.INITIALIZATION, f"RankerLogger initialized | log_dir={log_dir} | buffer_size={buffer_size}") def _setup_file_logging(self): """Setup rotating file logging.""" log_file = self.log_dir / f"{self.name}_{datetime.now().strftime('%Y%m%d')}.log" file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.DEBUG) file_formatter = logging.Formatter('%(message)s') file_handler.setFormatter(file_formatter) self.file_logger = logging.getLogger(f"{self.name}_file") self.file_logger.setLevel(logging.DEBUG) self.file_logger.addHandler(file_handler) self.file_logger.propagate = False def _log( self, level: LogLevel, category: EventCategory, message: str, asset: Optional[str] = None, metadata: Optional[Dict] = None, ): """Internal logging method.""" entry = LogEntry( timestamp=time.time(), level=level.name, category=category.value, message=message, asset=asset, metadata=metadata, ) # Write to file file_line = entry.to_file_line() self.file_logger.info(file_line) # Write to console (simplified) console_line = f"{entry.level:8s} | {entry.category:15s}" if asset: console_line += f" | {asset}" console_line += f" | {message}" if level == LogLevel.DEBUG: self.console_logger.debug(console_line) elif level == LogLevel.INFO: self.console_logger.info(console_line) elif level == LogLevel.WARNING: self.console_logger.warning(console_line) elif level == LogLevel.ERROR: self.console_logger.error(console_line) else: self.console_logger.critical(console_line) # Store in memory buffers with self._lock: self._buffer.append(entry) if asset: self._by_asset[asset].append(entry) self._by_level[level.name].append(entry) self._by_category[category.value].append(entry) self.stats["total_events"] += 1 self.stats["by_level"][level.name] += 1 self.stats["by_category"][category.value] += 1 if asset: self.stats["by_asset"][asset] += 1 if level == LogLevel.ERROR or level == LogLevel.CRITICAL: self.stats["errors"][category.value] += 1 # Callback for external consumers (e.g., dashboard) if self.on_event: try: self.on_event(entry.to_dict()) except Exception: pass # ── Public logging methods (called by ranker) ──────────────────────────────── def connection_event(self, component: str, status: str, details: str = ""): """Log WebSocket or Hub connection events.""" msg = f"{component} | {status}" if details: msg += f" | {details}" self._log(LogLevel.INFO if status in ["connected", "ready"] else LogLevel.WARNING, EventCategory.CONNECTION, msg) def training_update(self, step: int, loss: float, lr: float, asset_count: int = 0): """Log training progress.""" metadata = {"step": step, "loss": loss, "lr": lr, "asset_count": asset_count} self._log(LogLevel.DEBUG, EventCategory.TRAINING, f"step={step} | loss={loss:.4f} | lr={lr:.6f} | assets={asset_count}", metadata=metadata) def hub_update(self, asset: str, avn_accuracy: float, signal_confidence: float): """Log hub snapshot updates.""" metadata = {"avn_accuracy": avn_accuracy, "signal_confidence": signal_confidence} self._log(LogLevel.DEBUG, EventCategory.PROCESSING, f"hub update | acc={avn_accuracy:.3f} | conf={signal_confidence:.3f}", asset=asset, metadata=metadata) def signal(self, asset: str, direction: str, confidence: float, significance: float): """Log signal generation.""" metadata = {"direction": direction, "confidence": confidence, "significance": significance} self._log(LogLevel.INFO, EventCategory.SIGNAL, f"{direction} | conf={confidence:.3f} | sig={significance:.3f}", asset=asset, metadata=metadata) def trade_open(self, trade_id: str, asset: str, direction: str, price: float, qty: float): """Log trade opening.""" metadata = {"trade_id": trade_id, "price": price, "qty": qty, "direction": direction} self._log(LogLevel.INFO, EventCategory.TRADE, f"TRADE OPENED | ID={trade_id} | Dir={direction} | Entry={price:.4f} | Qty={qty:.6f}", asset=asset, metadata=metadata) def trade_close(self, trade_id: str, asset: str, pnl: float, return_pct: float, exit_price: Optional[float] = None): """Log trade closing. ✅ FIX v2.2: exit_price written into message text AND metadata.""" metadata = {"trade_id": trade_id, "pnl": pnl, "return_pct": return_pct} # Include exit_price in metadata for JSON export if exit_price is not None: metadata["exit_price"] = exit_price # ✅ FIX v2.2: Also embed exit_price in the pipe-delimited message so the # dashboard regex (TRADE_CLOSE_RE_WITH_EXIT) can capture it directly. # Previously exit_price lived only in the trailing JSON metadata, which the # regex never reached — causing the EXIT column to always display "—". msg = f"TRADE CLOSED | ID={trade_id} | pnl={pnl:+.4f} | return={return_pct:+.2%}" if exit_price is not None: msg += f" | exit_price={exit_price}" self._log(LogLevel.INFO, EventCategory.TRADE, msg, asset=asset, metadata=metadata) def ranking_update(self, rankings: List[Dict], top_asset: str, top_score: float): """Log ranking cycle results.""" metadata = {"top_asset": top_asset, "top_score": top_score, "num_ranked": len(rankings)} self._log(LogLevel.DEBUG, EventCategory.RANKING, f"rankings | top={top_asset} | score={top_score:.4f} | total={len(rankings)}", metadata=metadata) # ── Generic log method (backward compatibility) ───────────────────────────── def log(self, level: LogLevel, category: EventCategory, message: str, asset: Optional[str] = None): """Generic log method for custom events.""" self._log(level, category, message, asset=asset) # ── API methods for dashboard (required by hub_dashboard_service) ─────────── def get_recent(self, n: int = 50, category: Optional[str] = None) -> List[dict]: """Get most recent n log entries, optionally filtered by category.""" with self._lock: if category: entries = list(self._by_category.get(category.upper(), [])) else: entries = list(self._buffer) return [e.to_dict() for e in entries[-n:]] def get_by_asset(self, asset: str, n: int = 30) -> List[dict]: """Get recent logs for a specific asset.""" with self._lock: entries = list(self._by_asset.get(asset, [])) return [e.to_dict() for e in entries[-n:]] def get_by_level(self, level: str, n: int = 50) -> List[dict]: """Get recent logs by log level.""" with self._lock: entries = list(self._by_level.get(level.upper(), [])) return [e.to_dict() for e in entries[-n:]] def get_stats(self) -> dict: """Get logging statistics.""" with self._lock: return { "total_events": self.stats["total_events"], "by_level": dict(self.stats["by_level"]), "by_category": dict(self.stats["by_category"]), "by_asset": dict(self.stats["by_asset"]), "errors": dict(self.stats["errors"]), "buffer_size": len(self._buffer), "buffer_capacity": self.buffer_size, } def clear_buffer(self): """Clear in-memory buffer.""" with self._lock: self._buffer.clear() self._by_asset.clear() self._by_level.clear() self._by_category.clear() self.stats = { "total_events": 0, "by_level": defaultdict(int), "by_category": defaultdict(int), "by_asset": defaultdict(int), "errors": defaultdict(int), } def export_json(self, filepath: str, n: int = 500): """Export logs to JSON file.""" entries = self.get_recent(n) with open(filepath, 'w') as f: json.dump({ "export_time": datetime.now().isoformat(), "count": len(entries), "logs": entries }, f, indent=2) @staticmethod def make_trade_ws_hook(ws_send_fn): """ Factory for the on_event callback that forwards TRADE log entries to the Executo Hub via WebSocket. Usage in your executor space ranker: import asyncio, json from ranker_logging import RankerLogger async def connect_to_hub(): # ws = your websockets/websocket-client connection to the hub ranker_logger = RankerLogger( name="QuasarAXRVI_V75", on_event=RankerLogger.make_trade_ws_hook( lambda payload: asyncio.create_task(ws.send(json.dumps(payload))) ) ) Args: ws_send_fn: callable(dict) → sends a dict as JSON to the hub WS publisher. Can be a coroutine wrapper or sync function. """ def _hook(entry: dict) -> None: if entry.get("category") != "TRADE": return msg = entry.get("message", "") meta = entry.get("metadata") or {} ts = entry.get("timestamp", "") if "TRADE OPENED" in msg: payload = { "type": "trade_opened", "data": { "trade_id": meta.get("trade_id"), "asset": entry.get("asset"), "direction": meta.get("direction", "?"), "entry": meta.get("price"), "qty": meta.get("qty", 0.0), "opened_at": ts, }, } elif "TRADE CLOSED" in msg: payload = { "type": "trade_closed", "data": { "trade_id": meta.get("trade_id"), "asset": entry.get("asset"), "pnl": meta.get("pnl", 0.0), "exit_price": meta.get("exit_price"), "closed_at": ts, }, } else: return try: ws_send_fn(payload) except Exception: pass # never let a WS error crash the ranker return _hook class RankerLogBridge: """ Bridge between ranker components and the logging system. Provides convenience methods for common logging patterns. """ def __init__(self, ranker_logger: RankerLogger): self.logger = ranker_logger def capture_signal(self, asset: str, buffer, score: float): """Capture signal generation from asset buffer.""" if buffer and hasattr(buffer, 'latest_signal') and buffer.latest_signal: action = buffer.latest_signal.get("action", "HOLD") confidence = buffer.latest_signal.get("confidence", 0.0) self.logger.signal(asset, action, confidence, score) def capture_ranking(self, ranked: List, hub_snapshots: Dict): """Capture ranking results.""" if ranked: top = ranked[0] # Handle both dict and object types top_name = top.space_name if hasattr(top, 'space_name') else top.get('space_name', '') top_score = top.final_priority if hasattr(top, 'final_priority') else top.get('final_priority', 0) self.logger.ranking_update( [r.space_name if hasattr(r, 'space_name') else r.get('space_name', '') for r in ranked[:5]], top_name, top_score ) # Cache full ranked list so get_stats() can expose it self._last_rankings = [ { "space_name": r.space_name if hasattr(r, 'space_name') else r.get('space_name', ''), "score": r.final_priority if hasattr(r, 'final_priority') else r.get('final_priority', 0), "rank": r.rank if hasattr(r, 'rank') else r.get('rank', 0), "dominant_signal": r.dominant_signal if hasattr(r, 'dominant_signal') else r.get('dominant_signal', 'NEUTRAL'), "epistemic_std": r.epistemic_std if hasattr(r, 'epistemic_std') else r.get('epistemic_std', 0.0), } for r in ranked ] def log_event(self, level: LogLevel, category: EventCategory, message: str, asset: Optional[str] = None): """Generic event logging.""" self.logger.log(level, category, message, asset=asset)