Quasar-Executo / ranker_logging.py
KarlQuant's picture
Update ranker_logging.py
b7c4d87 verified
#!/usr/bin/env python3
"""
╔══════════════════════════════════════════════════════════════════════════════════════╗
β•‘ QUASAR RANKER β€” COMPLETE LOGGING SYSTEM β•‘
β•‘ ────────────────────────────────────────────────────────────────────────────────── β•‘
β•‘ Provides file-based logging with JSON export, in-memory buffer, and REST API. β•‘
β•‘ VERSION: v2.1 | 2026-03-30 β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
"""
import json
import logging
import os
import sys
import time
from collections import deque, defaultdict
from dataclasses import dataclass # <-- MISSING IMPORT!
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from threading import Lock
from typing import Dict, List, Optional, Any
class LogLevel(Enum):
DEBUG = auto()
INFO = auto()
WARNING = auto()
ERROR = auto()
CRITICAL = auto()
class EventCategory(Enum):
INITIALIZATION = 'INITIALIZATION'
PROCESSING = 'PROCESSING'
TERMINATION = 'TERMINATION'
ERROR_OCCURRED = 'ERROR_OCCURRED'
CONNECTION = 'CONNECTION'
TRAINING = 'TRAINING'
SIGNAL = 'SIGNAL'
TRADE = 'TRADE'
RANKING = 'RANKING'
@dataclass
class LogEntry:
"""Structured log entry with all metadata."""
timestamp: float
level: str
category: str
message: str
asset: Optional[str] = None
metadata: Optional[Dict] = None
def to_dict(self) -> dict:
return {
"ts": self.timestamp,
"timestamp": datetime.fromtimestamp(self.timestamp).isoformat(),
"level": self.level,
"category": self.category,
"message": self.message,
"asset": self.asset,
"metadata": self.metadata or {},
}
def to_file_line(self) -> str:
"""Format for file logging (human-readable)."""
dt = datetime.fromtimestamp(self.timestamp).strftime("%Y-%m-%d %H:%M:%S")
asset_str = f" | {self.asset}" if self.asset else ""
meta_str = f" | {json.dumps(self.metadata)}" if self.metadata else ""
return f"[{dt}] | {self.level:8s} | {self.category:15s}{asset_str} | {self.message}{meta_str}"
class RankerLogger:
"""
Complete logger with file output, in-memory buffer, and JSON export.
Features:
- Writes to both console and rotating log files
- Maintains in-memory buffer for API queries
- Per-asset and per-category indexing
- JSON export for persistence
- Thread-safe with locks
"""
def __init__(
self,
name: str = "QuasarAXRVI",
buffer_size: int = 1000,
log_dir: str = "./ranker_logs",
on_event: Optional[callable] = None,
):
self.name = name
self.buffer_size = buffer_size
self.log_dir = Path(log_dir)
self.on_event = on_event
# Create log directory if it doesn't exist
self.log_dir.mkdir(parents=True, exist_ok=True)
# In-memory buffers
self._buffer: deque = deque(maxlen=buffer_size)
self._by_asset: Dict[str, deque] = defaultdict(lambda: deque(maxlen=buffer_size // 2))
self._by_level: Dict[str, deque] = defaultdict(lambda: deque(maxlen=buffer_size // 2))
self._by_category: Dict[str, deque] = defaultdict(lambda: deque(maxlen=buffer_size // 2))
self._lock = Lock()
# File handler for persistent logging
self._setup_file_logging()
# Console logging
self.console_logger = logging.getLogger(name)
self.console_logger.setLevel(logging.DEBUG)
if not self.console_logger.handlers:
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
self.console_logger.addHandler(ch)
self.stats = {
"total_events": 0,
"by_level": defaultdict(int),
"by_category": defaultdict(int),
"by_asset": defaultdict(int),
"errors": defaultdict(int),
}
self._log(LogLevel.INFO, EventCategory.INITIALIZATION,
f"RankerLogger initialized | log_dir={log_dir} | buffer_size={buffer_size}")
def _setup_file_logging(self):
"""Setup rotating file logging."""
log_file = self.log_dir / f"{self.name}_{datetime.now().strftime('%Y%m%d')}.log"
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter('%(message)s')
file_handler.setFormatter(file_formatter)
self.file_logger = logging.getLogger(f"{self.name}_file")
self.file_logger.setLevel(logging.DEBUG)
self.file_logger.addHandler(file_handler)
self.file_logger.propagate = False
def _log(
self,
level: LogLevel,
category: EventCategory,
message: str,
asset: Optional[str] = None,
metadata: Optional[Dict] = None,
):
"""Internal logging method."""
entry = LogEntry(
timestamp=time.time(),
level=level.name,
category=category.value,
message=message,
asset=asset,
metadata=metadata,
)
# Write to file
file_line = entry.to_file_line()
self.file_logger.info(file_line)
# Write to console (simplified)
console_line = f"{entry.level:8s} | {entry.category:15s}"
if asset:
console_line += f" | {asset}"
console_line += f" | {message}"
if level == LogLevel.DEBUG:
self.console_logger.debug(console_line)
elif level == LogLevel.INFO:
self.console_logger.info(console_line)
elif level == LogLevel.WARNING:
self.console_logger.warning(console_line)
elif level == LogLevel.ERROR:
self.console_logger.error(console_line)
else:
self.console_logger.critical(console_line)
# Store in memory buffers
with self._lock:
self._buffer.append(entry)
if asset:
self._by_asset[asset].append(entry)
self._by_level[level.name].append(entry)
self._by_category[category.value].append(entry)
self.stats["total_events"] += 1
self.stats["by_level"][level.name] += 1
self.stats["by_category"][category.value] += 1
if asset:
self.stats["by_asset"][asset] += 1
if level == LogLevel.ERROR or level == LogLevel.CRITICAL:
self.stats["errors"][category.value] += 1
# Callback for external consumers (e.g., dashboard)
if self.on_event:
try:
self.on_event(entry.to_dict())
except Exception:
pass
# ── Public logging methods (called by ranker) ────────────────────────────────
def connection_event(self, component: str, status: str, details: str = ""):
"""Log WebSocket or Hub connection events."""
msg = f"{component} | {status}"
if details:
msg += f" | {details}"
self._log(LogLevel.INFO if status in ["connected", "ready"] else LogLevel.WARNING,
EventCategory.CONNECTION, msg)
def training_update(self, step: int, loss: float, lr: float, asset_count: int = 0):
"""Log training progress."""
metadata = {"step": step, "loss": loss, "lr": lr, "asset_count": asset_count}
self._log(LogLevel.DEBUG, EventCategory.TRAINING,
f"step={step} | loss={loss:.4f} | lr={lr:.6f} | assets={asset_count}",
metadata=metadata)
def hub_update(self, asset: str, avn_accuracy: float, signal_confidence: float):
"""Log hub snapshot updates."""
metadata = {"avn_accuracy": avn_accuracy, "signal_confidence": signal_confidence}
self._log(LogLevel.DEBUG, EventCategory.PROCESSING,
f"hub update | acc={avn_accuracy:.3f} | conf={signal_confidence:.3f}",
asset=asset, metadata=metadata)
def signal(self, asset: str, direction: str, confidence: float, significance: float):
"""Log signal generation."""
metadata = {"direction": direction, "confidence": confidence, "significance": significance}
self._log(LogLevel.INFO, EventCategory.SIGNAL,
f"{direction} | conf={confidence:.3f} | sig={significance:.3f}",
asset=asset, metadata=metadata)
def trade_open(self, trade_id: str, asset: str, direction: str, price: float, qty: float):
"""Log trade opening."""
metadata = {"trade_id": trade_id, "price": price, "qty": qty, "direction": direction}
self._log(LogLevel.INFO, EventCategory.TRADE,
f"TRADE OPENED | ID={trade_id} | Dir={direction} | Entry={price:.4f} | Qty={qty:.6f}",
asset=asset, metadata=metadata)
def trade_close(self, trade_id: str, asset: str, pnl: float, return_pct: float, exit_price: Optional[float] = None):
"""Log trade closing. βœ… FIX v2.2: exit_price written into message text AND metadata."""
metadata = {"trade_id": trade_id, "pnl": pnl, "return_pct": return_pct}
# Include exit_price in metadata for JSON export
if exit_price is not None:
metadata["exit_price"] = exit_price
# βœ… FIX v2.2: Also embed exit_price in the pipe-delimited message so the
# dashboard regex (TRADE_CLOSE_RE_WITH_EXIT) can capture it directly.
# Previously exit_price lived only in the trailing JSON metadata, which the
# regex never reached β€” causing the EXIT column to always display "β€”".
msg = f"TRADE CLOSED | ID={trade_id} | pnl={pnl:+.4f} | return={return_pct:+.2%}"
if exit_price is not None:
msg += f" | exit_price={exit_price}"
self._log(LogLevel.INFO, EventCategory.TRADE, msg, asset=asset, metadata=metadata)
def ranking_update(self, rankings: List[Dict], top_asset: str, top_score: float):
"""Log ranking cycle results."""
metadata = {"top_asset": top_asset, "top_score": top_score, "num_ranked": len(rankings)}
self._log(LogLevel.DEBUG, EventCategory.RANKING,
f"rankings | top={top_asset} | score={top_score:.4f} | total={len(rankings)}",
metadata=metadata)
# ── Generic log method (backward compatibility) ─────────────────────────────
def log(self, level: LogLevel, category: EventCategory, message: str, asset: Optional[str] = None):
"""Generic log method for custom events."""
self._log(level, category, message, asset=asset)
# ── API methods for dashboard (required by hub_dashboard_service) ───────────
def get_recent(self, n: int = 50, category: Optional[str] = None) -> List[dict]:
"""Get most recent n log entries, optionally filtered by category."""
with self._lock:
if category:
entries = list(self._by_category.get(category.upper(), []))
else:
entries = list(self._buffer)
return [e.to_dict() for e in entries[-n:]]
def get_by_asset(self, asset: str, n: int = 30) -> List[dict]:
"""Get recent logs for a specific asset."""
with self._lock:
entries = list(self._by_asset.get(asset, []))
return [e.to_dict() for e in entries[-n:]]
def get_by_level(self, level: str, n: int = 50) -> List[dict]:
"""Get recent logs by log level."""
with self._lock:
entries = list(self._by_level.get(level.upper(), []))
return [e.to_dict() for e in entries[-n:]]
def get_stats(self) -> dict:
"""Get logging statistics."""
with self._lock:
return {
"total_events": self.stats["total_events"],
"by_level": dict(self.stats["by_level"]),
"by_category": dict(self.stats["by_category"]),
"by_asset": dict(self.stats["by_asset"]),
"errors": dict(self.stats["errors"]),
"buffer_size": len(self._buffer),
"buffer_capacity": self.buffer_size,
}
def clear_buffer(self):
"""Clear in-memory buffer."""
with self._lock:
self._buffer.clear()
self._by_asset.clear()
self._by_level.clear()
self._by_category.clear()
self.stats = {
"total_events": 0,
"by_level": defaultdict(int),
"by_category": defaultdict(int),
"by_asset": defaultdict(int),
"errors": defaultdict(int),
}
def export_json(self, filepath: str, n: int = 500):
"""Export logs to JSON file."""
entries = self.get_recent(n)
with open(filepath, 'w') as f:
json.dump({
"export_time": datetime.now().isoformat(),
"count": len(entries),
"logs": entries
}, f, indent=2)
@staticmethod
def make_trade_ws_hook(ws_send_fn):
"""
Factory for the on_event callback that forwards TRADE log entries to the
Executo Hub via WebSocket.
Usage in your executor space ranker:
import asyncio, json
from ranker_logging import RankerLogger
async def connect_to_hub():
# ws = your websockets/websocket-client connection to the hub
ranker_logger = RankerLogger(
name="QuasarAXRVI_V75",
on_event=RankerLogger.make_trade_ws_hook(
lambda payload: asyncio.create_task(ws.send(json.dumps(payload)))
)
)
Args:
ws_send_fn: callable(dict) β†’ sends a dict as JSON to the hub WS publisher.
Can be a coroutine wrapper or sync function.
"""
def _hook(entry: dict) -> None:
if entry.get("category") != "TRADE":
return
msg = entry.get("message", "")
meta = entry.get("metadata") or {}
ts = entry.get("timestamp", "")
if "TRADE OPENED" in msg:
payload = {
"type": "trade_opened",
"data": {
"trade_id": meta.get("trade_id"),
"asset": entry.get("asset"),
"direction": meta.get("direction", "?"),
"entry": meta.get("price"),
"qty": meta.get("qty", 0.0),
"opened_at": ts,
},
}
elif "TRADE CLOSED" in msg:
payload = {
"type": "trade_closed",
"data": {
"trade_id": meta.get("trade_id"),
"asset": entry.get("asset"),
"pnl": meta.get("pnl", 0.0),
"exit_price": meta.get("exit_price"),
"closed_at": ts,
},
}
else:
return
try:
ws_send_fn(payload)
except Exception:
pass # never let a WS error crash the ranker
return _hook
class RankerLogBridge:
"""
Bridge between ranker components and the logging system.
Provides convenience methods for common logging patterns.
"""
def __init__(self, ranker_logger: RankerLogger):
self.logger = ranker_logger
def capture_signal(self, asset: str, buffer, score: float):
"""Capture signal generation from asset buffer."""
if buffer and hasattr(buffer, 'latest_signal') and buffer.latest_signal:
action = buffer.latest_signal.get("action", "HOLD")
confidence = buffer.latest_signal.get("confidence", 0.0)
self.logger.signal(asset, action, confidence, score)
def capture_ranking(self, ranked: List, hub_snapshots: Dict):
"""Capture ranking results."""
if ranked:
top = ranked[0]
# Handle both dict and object types
top_name = top.space_name if hasattr(top, 'space_name') else top.get('space_name', '')
top_score = top.final_priority if hasattr(top, 'final_priority') else top.get('final_priority', 0)
self.logger.ranking_update(
[r.space_name if hasattr(r, 'space_name') else r.get('space_name', '') for r in ranked[:5]],
top_name,
top_score
)
# Cache full ranked list so get_stats() can expose it
self._last_rankings = [
{
"space_name": r.space_name if hasattr(r, 'space_name') else r.get('space_name', ''),
"score": r.final_priority if hasattr(r, 'final_priority') else r.get('final_priority', 0),
"rank": r.rank if hasattr(r, 'rank') else r.get('rank', 0),
"dominant_signal": r.dominant_signal if hasattr(r, 'dominant_signal') else r.get('dominant_signal', 'NEUTRAL'),
"epistemic_std": r.epistemic_std if hasattr(r, 'epistemic_std') else r.get('epistemic_std', 0.0),
}
for r in ranked
]
def log_event(self, level: LogLevel, category: EventCategory, message: str, asset: Optional[str] = None):
"""Generic event logging."""
self.logger.log(level, category, message, asset=asset)