| """ |
| Performance Tracker - analyses training metrics history to identify |
| improvement trends, best adapters, and score progression. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import sys |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| _THIS_DIR = Path(__file__).resolve().parent |
| _PROJECT_ROOT = _THIS_DIR.parent |
| if str(_PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(_PROJECT_ROOT)) |
|
|
| from observatory.metrics_logger import MetricsLogger |
|
|
|
|
| class PerformanceTracker: |
| """Analyse training metrics to track improvement over time.""" |
|
|
| def __init__(self, logger: Optional[MetricsLogger] = None, log_file: Optional[str] = None): |
| self.logger = logger or MetricsLogger(log_file=log_file) |
|
|
| |
|
|
| def score_progression(self, adapter: Optional[str] = None) -> List[Dict[str, Any]]: |
| """Get score progression over time for an adapter (or all). |
| |
| Returns list of dicts with timestamp, adapter, reasoning_score, loss, epoch. |
| """ |
| if adapter: |
| entries = self.logger.get_by_adapter(adapter) |
| else: |
| entries = self.logger.get_all() |
|
|
| entries = sorted(entries, key=lambda e: e.get("timestamp", "")) |
| return [ |
| { |
| "timestamp": e.get("timestamp"), |
| "adapter": e.get("adapter"), |
| "reasoning_score": e.get("reasoning_score", 0), |
| "loss": e.get("loss", 0), |
| "epoch": e.get("epoch", 0), |
| "dataset_size": e.get("dataset_size", 0), |
| } |
| for e in entries |
| ] |
|
|
| def calculate_improvement(self, adapter: str) -> Dict[str, Any]: |
| """Calculate improvement between first and last run for an adapter. |
| |
| Returns dict with first_score, last_score, delta, percent_change, |
| num_runs, first_timestamp, last_timestamp. |
| """ |
| entries = self.logger.get_by_adapter(adapter) |
| if len(entries) < 2: |
| return { |
| "adapter": adapter, |
| "num_runs": len(entries), |
| "first_score": entries[0]["reasoning_score"] if entries else 0, |
| "last_score": entries[-1]["reasoning_score"] if entries else 0, |
| "delta": 0.0, |
| "percent_change": 0.0, |
| "sufficient_data": False, |
| } |
|
|
| entries = sorted(entries, key=lambda e: e.get("timestamp", "")) |
| first = entries[0] |
| last = entries[-1] |
| first_score = first.get("reasoning_score", 0) |
| last_score = last.get("reasoning_score", 0) |
| delta = last_score - first_score |
| pct = (delta / first_score * 100) if first_score > 0 else 0.0 |
|
|
| return { |
| "adapter": adapter, |
| "num_runs": len(entries), |
| "first_score": round(first_score, 6), |
| "last_score": round(last_score, 6), |
| "delta": round(delta, 6), |
| "percent_change": round(pct, 2), |
| "first_timestamp": first.get("timestamp"), |
| "last_timestamp": last.get("timestamp"), |
| "sufficient_data": True, |
| } |
|
|
| def improvement_trends(self) -> List[Dict[str, Any]]: |
| """Calculate improvement trends for all adapters.""" |
| adapters = self.logger.get_unique_adapters() |
| trends = [] |
| for adapter in adapters: |
| trend = self.calculate_improvement(adapter) |
| trends.append(trend) |
| trends.sort(key=lambda t: t.get("delta", 0), reverse=True) |
| return trends |
|
|
| def best_adapters(self, top_n: int = 5) -> List[Dict[str, Any]]: |
| """Find the best-performing adapter versions by reasoning score. |
| |
| Returns list of entries sorted by highest reasoning_score. |
| """ |
| entries = self.logger.get_all() |
| if not entries: |
| return [] |
|
|
| |
| best: Dict[str, Dict[str, Any]] = {} |
| for e in entries: |
| adapter = e.get("adapter", "unknown") |
| score = e.get("reasoning_score", 0) |
| if adapter not in best or score > best[adapter].get("reasoning_score", 0): |
| best[adapter] = e |
|
|
| ranked = sorted(best.values(), key=lambda e: e.get("reasoning_score", 0), reverse=True) |
| return ranked[:top_n] |
|
|
| def run_to_run_deltas(self, adapter: str) -> List[Dict[str, float]]: |
| """Calculate score delta between consecutive runs of an adapter.""" |
| entries = self.logger.get_by_adapter(adapter) |
| entries = sorted(entries, key=lambda e: e.get("timestamp", "")) |
|
|
| deltas = [] |
| for i in range(1, len(entries)): |
| prev_score = entries[i - 1].get("reasoning_score", 0) |
| curr_score = entries[i].get("reasoning_score", 0) |
| deltas.append({ |
| "run": i, |
| "from_timestamp": entries[i - 1].get("timestamp"), |
| "to_timestamp": entries[i].get("timestamp"), |
| "score_delta": round(curr_score - prev_score, 6), |
| "loss_delta": round( |
| entries[i].get("loss", 0) - entries[i - 1].get("loss", 0), 6 |
| ), |
| }) |
| return deltas |
|
|
| def loss_progression(self, adapter: Optional[str] = None) -> List[Tuple[str, float]]: |
| """Get loss values over time.""" |
| if adapter: |
| entries = self.logger.get_by_adapter(adapter) |
| else: |
| entries = self.logger.get_all() |
| entries = sorted(entries, key=lambda e: e.get("timestamp", "")) |
| return [(e.get("timestamp", ""), e.get("loss", 0)) for e in entries] |
|
|
| |
|
|
| def format_report(self) -> str: |
| """Generate a formatted text report of performance tracking.""" |
| lines: List[str] = [] |
| lines.append("=" * 74) |
| lines.append(" CODETTE PERFORMANCE TRACKING REPORT") |
| lines.append("=" * 74) |
|
|
| entries = self.logger.get_all() |
| lines.append(f" Total logged runs: {len(entries)}") |
| lines.append(f" Unique adapters: {len(self.logger.get_unique_adapters())}") |
| lines.append("") |
|
|
| |
| best = self.best_adapters(top_n=10) |
| if best: |
| lines.append("-" * 74) |
| lines.append(" TOP ADAPTERS BY REASONING SCORE") |
| lines.append("-" * 74) |
| lines.append(f" {'Rank':<5} {'Adapter':<28} {'Score':>8} {'Loss':>8} {'Epoch':>6} {'Data':>6}") |
| lines.append(f" {'----':<5} {'-------':<28} {'-----':>8} {'----':>8} {'-----':>6} {'----':>6}") |
| for i, entry in enumerate(best, 1): |
| name = entry.get("adapter", "?")[:27] |
| score = entry.get("reasoning_score", 0) |
| loss = entry.get("loss", 0) |
| epoch = entry.get("epoch", 0) |
| ds = entry.get("dataset_size", 0) |
| lines.append( |
| f" {i:<5} {name:<28} {score:>8.4f} {loss:>8.4f} {epoch:>6} {ds:>6}" |
| ) |
| lines.append("") |
|
|
| |
| trends = self.improvement_trends() |
| if trends: |
| lines.append("-" * 74) |
| lines.append(" IMPROVEMENT TRENDS (first run -> last run)") |
| lines.append("-" * 74) |
| lines.append( |
| f" {'Adapter':<28} {'First':>8} {'Last':>8} {'Delta':>8} {'Change':>8} {'Runs':>5}" |
| ) |
| lines.append( |
| f" {'-------':<28} {'-----':>8} {'----':>8} {'-----':>8} {'------':>8} {'----':>5}" |
| ) |
| for t in trends: |
| name = t["adapter"][:27] |
| first = t["first_score"] |
| last = t["last_score"] |
| delta = t["delta"] |
| pct = t["percent_change"] |
| runs = t["num_runs"] |
| sign = "+" if delta >= 0 else "" |
| lines.append( |
| f" {name:<28} {first:>8.4f} {last:>8.4f} " |
| f"{sign}{delta:>7.4f} {sign}{pct:>6.1f}% {runs:>5}" |
| ) |
| lines.append("") |
|
|
| |
| adapters = self.logger.get_unique_adapters() |
| if adapters: |
| lines.append("-" * 74) |
| lines.append(" SCORE PROGRESSION (ASCII sparkline)") |
| lines.append("-" * 74) |
| for adapter in adapters[:8]: |
| progression = self.score_progression(adapter) |
| if not progression: |
| continue |
| scores = [p["reasoning_score"] for p in progression] |
| sparkline = self._sparkline(scores, width=40) |
| name = adapter[:24] |
| lines.append(f" {name:<25} {sparkline} [{scores[0]:.3f} -> {scores[-1]:.3f}]") |
| lines.append("") |
|
|
| lines.append("=" * 74) |
| return "\n".join(lines) |
|
|
| @staticmethod |
| def _sparkline(values: List[float], width: int = 40) -> str: |
| """Create an ASCII sparkline from a list of values.""" |
| if not values: |
| return "" |
| if len(values) == 1: |
| return "-" |
|
|
| min_v = min(values) |
| max_v = max(values) |
| range_v = max_v - min_v if max_v > min_v else 1.0 |
|
|
| chars = " _.-~^" |
| n_chars = len(chars) - 1 |
|
|
| |
| if len(values) > width: |
| step = len(values) / width |
| resampled = [] |
| for i in range(width): |
| idx = int(i * step) |
| resampled.append(values[min(idx, len(values) - 1)]) |
| values = resampled |
| elif len(values) < width: |
| |
| values = values + [values[-1]] * (width - len(values)) |
|
|
| result = "" |
| for v in values[:width]: |
| normalised = (v - min_v) / range_v |
| idx = int(normalised * n_chars) |
| idx = max(0, min(idx, n_chars)) |
| result += chars[idx] |
|
|
| return result |
|
|
|
|
| |
| |
| |
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser( |
| description="Codette Performance Tracker - analyse training run history" |
| ) |
| parser.add_argument( |
| "--log-file", "-l", |
| default=None, |
| help="Path to observatory_metrics.json (default: auto-detect)", |
| ) |
| parser.add_argument( |
| "--adapter", "-a", |
| default=None, |
| help="Filter to a specific adapter name", |
| ) |
| parser.add_argument( |
| "--best", "-b", |
| type=int, |
| default=None, |
| help="Show top N best adapters", |
| ) |
| parser.add_argument( |
| "--deltas", "-d", |
| default=None, |
| help="Show run-to-run deltas for a specific adapter", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| tracker = PerformanceTracker(log_file=args.log_file) |
|
|
| if args.best: |
| best = tracker.best_adapters(top_n=args.best) |
| for i, entry in enumerate(best, 1): |
| print(f" {i}. {entry.get('adapter', '?')} - " |
| f"score: {entry.get('reasoning_score', 0):.4f}, " |
| f"loss: {entry.get('loss', 0):.4f}") |
| return |
|
|
| if args.deltas: |
| deltas = tracker.run_to_run_deltas(args.deltas) |
| if not deltas: |
| print(f"No run-to-run data for adapter: {args.deltas}") |
| return |
| for d in deltas: |
| sign = "+" if d["score_delta"] >= 0 else "" |
| print(f" Run {d['run']}: score {sign}{d['score_delta']:.6f}, " |
| f"loss {sign}{d['loss_delta']:.6f}") |
| return |
|
|
| if args.adapter: |
| improvement = tracker.calculate_improvement(args.adapter) |
| print(f" Adapter: {improvement['adapter']}") |
| print(f" Runs: {improvement['num_runs']}") |
| print(f" First score: {improvement['first_score']:.6f}") |
| print(f" Last score: {improvement['last_score']:.6f}") |
| print(f" Delta: {improvement['delta']:+.6f}") |
| print(f" Change: {improvement['percent_change']:+.2f}%") |
| return |
|
|
| |
| print(tracker.format_report()) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|