| """ |
| Failure Analyzer - examines evaluation logs to find patterns in |
| low-scoring responses, cluster failures by topic, and recommend |
| dataset improvements. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import re |
| import sys |
| from collections import Counter, defaultdict |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Set, Tuple |
|
|
| _THIS_DIR = Path(__file__).resolve().parent |
| _PROJECT_ROOT = _THIS_DIR.parent |
| if str(_PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(_PROJECT_ROOT)) |
|
|
|
|
| |
| |
| |
|
|
| _STOP_WORDS: Set[str] = { |
| "the", "a", "an", "is", "are", "was", "were", "be", "been", "being", |
| "have", "has", "had", "do", "does", "did", "will", "would", "shall", |
| "should", "may", "might", "must", "can", "could", "to", "of", "in", |
| "for", "on", "with", "at", "by", "from", "as", "into", "through", |
| "during", "before", "after", "above", "below", "between", "out", |
| "off", "over", "under", "again", "further", "then", "once", "here", |
| "there", "when", "where", "why", "how", "all", "both", "each", |
| "few", "more", "most", "other", "some", "such", "no", "nor", "not", |
| "only", "own", "same", "so", "than", "too", "very", "just", "don", |
| "now", "and", "but", "or", "if", "while", "that", "this", "what", |
| "which", "who", "whom", "it", "its", "they", "them", "their", |
| "he", "she", "him", "her", "his", "we", "us", "our", "you", "your", |
| "i", "me", "my", "about", "up", |
| } |
|
|
|
|
| def _extract_keywords(text: str, top_n: int = 8) -> List[str]: |
| """Extract the most frequent meaningful words from text.""" |
| words = re.findall(r"[a-z]{3,}", text.lower()) |
| filtered = [w for w in words if w not in _STOP_WORDS] |
| counts = Counter(filtered) |
| return [w for w, _ in counts.most_common(top_n)] |
|
|
|
|
| def _jaccard(set_a: Set[str], set_b: Set[str]) -> float: |
| """Jaccard similarity between two sets.""" |
| if not set_a and not set_b: |
| return 1.0 |
| union = set_a | set_b |
| if not union: |
| return 0.0 |
| return len(set_a & set_b) / len(union) |
|
|
|
|
| |
| |
| |
|
|
| class FailureAnalyzer: |
| """Analyze evaluation results to identify failure patterns.""" |
|
|
| |
| FAILURE_THRESHOLD = 0.4 |
| WEAK_THRESHOLD = 0.55 |
|
|
| def __init__( |
| self, |
| failure_threshold: float = 0.4, |
| weak_threshold: float = 0.55, |
| ): |
| self.failure_threshold = failure_threshold |
| self.weak_threshold = weak_threshold |
|
|
| |
|
|
| @staticmethod |
| def load_results(filepath: str) -> Dict[str, Any]: |
| """Load benchmark results JSON produced by BenchmarkRunner.""" |
| with open(filepath, "r", encoding="utf-8") as f: |
| return json.load(f) |
|
|
| |
|
|
| def find_failures( |
| self, |
| results: Dict[str, Any], |
| dimension: str = "overall", |
| ) -> List[Dict[str, Any]]: |
| """Return entries whose *dimension* score is below failure threshold.""" |
| failures = [] |
| for entry in results.get("all_scores", []): |
| score = entry.get("scores", {}).get(dimension) |
| if score is not None and score < self.failure_threshold: |
| failures.append({ |
| "prompt": entry["prompt"], |
| "score": score, |
| "all_scores": entry["scores"], |
| }) |
| failures.sort(key=lambda x: x["score"]) |
| return failures |
|
|
| def find_weak_areas( |
| self, |
| results: Dict[str, Any], |
| ) -> Dict[str, float]: |
| """Identify which scoring dimensions are weakest across all prompts. |
| |
| Returns dict of dimension -> average score, sorted ascending. |
| """ |
| dimension_totals: Dict[str, float] = defaultdict(float) |
| dimension_counts: Dict[str, int] = defaultdict(int) |
|
|
| for entry in results.get("all_scores", []): |
| for k, v in entry.get("scores", {}).items(): |
| if isinstance(v, float) and k not in ("word_count", "sentence_count"): |
| dimension_totals[k] += v |
| dimension_counts[k] += 1 |
|
|
| averages = {} |
| for k in dimension_totals: |
| if dimension_counts[k] > 0: |
| averages[k] = round(dimension_totals[k] / dimension_counts[k], 4) |
|
|
| return dict(sorted(averages.items(), key=lambda x: x[1])) |
|
|
| def failure_rate_by_category( |
| self, |
| results: Dict[str, Any], |
| dimension: str = "overall", |
| ) -> Dict[str, Dict[str, Any]]: |
| """Calculate failure rates per category.""" |
| rates: Dict[str, Dict[str, Any]] = {} |
|
|
| for cat, data in results.get("categories", {}).items(): |
| details = data.get("details", []) |
| total = len(details) |
| if total == 0: |
| continue |
| failures = sum( |
| 1 for d in details |
| if d.get("scores", {}).get(dimension, 1.0) < self.failure_threshold |
| ) |
| weak = sum( |
| 1 for d in details |
| if self.failure_threshold <= d.get("scores", {}).get(dimension, 1.0) < self.weak_threshold |
| ) |
| rates[cat] = { |
| "total": total, |
| "failures": failures, |
| "weak": weak, |
| "failure_rate": round(failures / total, 4), |
| "weak_rate": round(weak / total, 4), |
| "avg_score": data.get("average_scores", {}).get(dimension, 0), |
| } |
|
|
| return dict(sorted(rates.items(), key=lambda x: -x[1]["failure_rate"])) |
|
|
| def cluster_failures_by_topic( |
| self, |
| failures: List[Dict[str, Any]], |
| similarity_threshold: float = 0.25, |
| ) -> List[Dict[str, Any]]: |
| """Cluster failure prompts by keyword overlap. |
| |
| Uses a simple greedy clustering: each prompt is assigned to the first |
| cluster whose centroid keywords have Jaccard similarity above threshold. |
| """ |
| clusters: List[Dict[str, Any]] = [] |
|
|
| for failure in failures: |
| prompt = failure["prompt"] |
| keywords = set(_extract_keywords(prompt)) |
|
|
| matched = False |
| for cluster in clusters: |
| if _jaccard(keywords, cluster["keywords"]) >= similarity_threshold: |
| cluster["prompts"].append(failure) |
| cluster["keywords"] |= keywords |
| matched = True |
| break |
|
|
| if not matched: |
| clusters.append({ |
| "keywords": keywords, |
| "prompts": [failure], |
| }) |
|
|
| |
| result = [] |
| for i, c in enumerate(clusters): |
| avg_score = sum(p["score"] for p in c["prompts"]) / len(c["prompts"]) |
| result.append({ |
| "cluster_id": i, |
| "topic_keywords": sorted(c["keywords"])[:10], |
| "num_failures": len(c["prompts"]), |
| "avg_score": round(avg_score, 4), |
| "sample_prompts": [p["prompt"] for p in c["prompts"][:5]], |
| }) |
|
|
| result.sort(key=lambda x: -x["num_failures"]) |
| return result |
|
|
| def identify_weakest_dimensions( |
| self, |
| results: Dict[str, Any], |
| top_n: int = 3, |
| ) -> List[Tuple[str, float]]: |
| """Return the top_n weakest scoring dimensions.""" |
| averages = self.find_weak_areas(results) |
| items = [(k, v) for k, v in averages.items() if k != "overall"] |
| return items[:top_n] |
|
|
| |
|
|
| def generate_recommendations( |
| self, |
| results: Dict[str, Any], |
| ) -> List[str]: |
| """Generate actionable recommendations for dataset improvement.""" |
| recommendations: List[str] = [] |
|
|
| |
| weakest = self.identify_weakest_dimensions(results, top_n=3) |
| for dim, score in weakest: |
| if score < self.failure_threshold: |
| recommendations.append( |
| f"CRITICAL: Dimension '{dim}' averages {score:.3f} (below failure threshold). " |
| f"Add training examples that emphasise {dim} explicitly." |
| ) |
| elif score < self.weak_threshold: |
| recommendations.append( |
| f"IMPROVE: Dimension '{dim}' averages {score:.3f} (weak). " |
| f"Augment dataset with responses demonstrating strong {dim}." |
| ) |
|
|
| |
| cat_rates = self.failure_rate_by_category(results) |
| for cat, info in cat_rates.items(): |
| if info["failure_rate"] > 0.3: |
| recommendations.append( |
| f"CATEGORY '{cat}': {info['failure_rate']:.0%} failure rate. " |
| f"Add more diverse training examples for {cat} topics." |
| ) |
|
|
| |
| failures = self.find_failures(results) |
| if failures: |
| clusters = self.cluster_failures_by_topic(failures) |
| for cluster in clusters[:3]: |
| kw = ", ".join(cluster["topic_keywords"][:5]) |
| recommendations.append( |
| f"TOPIC CLUSTER: {cluster['num_failures']} failures around " |
| f"[{kw}]. Create targeted training data for these concepts." |
| ) |
|
|
| |
| overall = results.get("overall", {}) |
| overall_score = overall.get("overall", 0) |
| if overall_score < 0.5: |
| recommendations.append( |
| "GENERAL: Overall score is very low. Consider increasing dataset size " |
| "and diversity before next training run." |
| ) |
| elif overall_score < 0.65: |
| recommendations.append( |
| "GENERAL: Overall score is moderate. Focus on the weakest categories " |
| "and dimensions for the next dataset iteration." |
| ) |
|
|
| if not recommendations: |
| recommendations.append( |
| "No critical issues detected. Continue monitoring with additional benchmarks." |
| ) |
|
|
| return recommendations |
|
|
| |
|
|
| def format_report(self, results: Dict[str, Any]) -> str: |
| """Generate a full failure analysis report.""" |
| lines: List[str] = [] |
| lines.append("=" * 70) |
| lines.append(" FAILURE ANALYSIS REPORT") |
| lines.append("=" * 70) |
|
|
| |
| lines.append("") |
| lines.append("-" * 70) |
| lines.append(" WEAKEST SCORING DIMENSIONS") |
| lines.append("-" * 70) |
| weak_areas = self.find_weak_areas(results) |
| for dim, score in list(weak_areas.items())[:6]: |
| status = "FAIL" if score < self.failure_threshold else ( |
| "WEAK" if score < self.weak_threshold else "OK " |
| ) |
| lines.append(f" [{status}] {dim:<22s} {score:.4f}") |
|
|
| |
| lines.append("") |
| lines.append("-" * 70) |
| lines.append(" FAILURE RATES BY CATEGORY") |
| lines.append("-" * 70) |
| cat_rates = self.failure_rate_by_category(results) |
| for cat, info in cat_rates.items(): |
| lines.append( |
| f" {cat:<18s} fail: {info['failure_rate']:>5.1%} " |
| f"weak: {info['weak_rate']:>5.1%} " |
| f"avg: {info['avg_score']:.4f}" |
| ) |
|
|
| |
| failures = self.find_failures(results) |
| if failures: |
| lines.append("") |
| lines.append("-" * 70) |
| lines.append(f" FAILURE CLUSTERS ({len(failures)} total failures)") |
| lines.append("-" * 70) |
| clusters = self.cluster_failures_by_topic(failures) |
| for c in clusters[:5]: |
| kw = ", ".join(c["topic_keywords"][:6]) |
| lines.append(f" Cluster {c['cluster_id']}: " |
| f"{c['num_failures']} failures, " |
| f"avg score {c['avg_score']:.4f}") |
| lines.append(f" Topics: {kw}") |
| for p in c["sample_prompts"][:2]: |
| lines.append(f" - {p[:70]}...") |
|
|
| |
| lines.append("") |
| lines.append("-" * 70) |
| lines.append(" RECOMMENDATIONS") |
| lines.append("-" * 70) |
| recs = self.generate_recommendations(results) |
| for i, rec in enumerate(recs, 1): |
| lines.append(f" {i}. {rec}") |
|
|
| lines.append("") |
| lines.append("=" * 70) |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| def main() -> None: |
| import argparse |
|
|
| parser = argparse.ArgumentParser( |
| description="Codette Failure Analyzer - identify patterns in evaluation failures" |
| ) |
| parser.add_argument( |
| "--results", "-r", |
| required=True, |
| help="Path to benchmark results JSON", |
| ) |
| parser.add_argument( |
| "--failure-threshold", "-f", |
| type=float, |
| default=0.4, |
| help="Score threshold for failure (default: 0.4)", |
| ) |
| parser.add_argument( |
| "--weak-threshold", "-w", |
| type=float, |
| default=0.55, |
| help="Score threshold for weak (default: 0.55)", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| analyzer = FailureAnalyzer( |
| failure_threshold=args.failure_threshold, |
| weak_threshold=args.weak_threshold, |
| ) |
| results = analyzer.load_results(args.results) |
| print(analyzer.format_report(results)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|