| import os |
| import pandas as pd |
| import re |
| import glob |
| from tqdm import tqdm |
| import datetime |
| import openai |
| import argparse |
| import io |
|
|
| def summarize_results(results_dirs, output_csv, model, no_llm = False): |
| client = openai.OpenAI( |
| api_key = os.environ.get('CBORG_API_KEY'), |
| base_url = 'https://api.cborg.lbl.gov' |
| ) |
|
|
| error_categorization_prompt = ( |
| "You are an expert at classifying error messages from machine learning workflows in high energy physics.\n\n" |
| "Workflow summary:\n" |
| "- A user provides an analysis task prompt.\n" |
| "- A supervisor agent breaks down the task and instructs a coder agent.\n" |
| "- The coder agent generates code, which is executed.\n" |
| "- The supervisor reviews results and may iterate with the coder to fix issues until the task is complete.\n" |
| "Below is a list of error categories:\n" |
| "all data weights = 0, " |
| "dummy data created, " |
| "function-calling error, " |
| "incorrect branch name, " |
| "intermediate file not found, " |
| "semantic error, " |
| "other." |
| "Your task: For the given error description, select the single most appropriate error category from the list above. " |
| "Base your choice on the underlying nature or root cause of the error, not on the symptoms, error messages, or observable effects. " |
| "Focus on what fundamentally caused the error, such as logic mistakes, missing dependencies, data mismatches, or miscommunication, rather than how the error was reported or observed.\n" |
| "Return ALL applicable category names, each wrapped with three asterisks on each side, separated by commas, like this: ***Category***" |
| "Do not include any other text, explanation, or formatting." |
| "log file:\n" |
| ) |
|
|
| results = [] |
| for results_dir in results_dirs: |
| for name in tqdm(os.listdir(results_dir), desc=f"generating error descriptions for {results_dir}"): |
| output_dir = os.path.join(results_dir, name) |
|
|
| if os.path.isdir(output_dir): |
| |
| config_match = re.match(r'^(.*?)_step\d+', name) |
| config = config_match.group(1) if config_match else None |
|
|
| |
| step_match = re.search(r'_step(\d+)', name) |
| step = int(step_match.group(1)) if step_match else None |
|
|
| result = { |
| "supervisor": None, |
| "coder": None, |
| "step": step, |
| "success": None, |
| "iterations": None, |
| "duration": None, |
| "API_calls": None, |
| "input_tokens": None, |
| "output_tokens": None, |
| "user_prompt_tokens": None, |
| "supervisor_to_coder_tokens": None, |
| "coder_output_tokens": None, |
| "feedback_to_supervisor_tokens": None, |
| "error": "Uncategorized", |
| "error_description": None, |
| "output_dir": output_dir, |
| } |
|
|
| log_dir = os.path.join(output_dir, "logs") |
| if os.path.isdir(log_dir): |
| comp_log_files = glob.glob(os.path.join(log_dir, "*comprehensive_log.txt")) |
| comp_log_str = None |
| if comp_log_files: |
| with open(comp_log_files[0], "r") as f: |
| comp_log_str = f.read() |
| else: |
| result["success"] = False |
| result["error_description"] = "comprehensive log file not found" |
| results.append(result) |
| continue |
|
|
| supervisor_match = re.search(r"Supervisor:\s*([^\s]+)", comp_log_str) |
| coder_match = re.search(r"Coder:\s*([^\s]+)", comp_log_str) |
| if supervisor_match: |
| result["supervisor"] = supervisor_match.group(1) |
| if coder_match: |
| result["coder"] = coder_match.group(1) |
|
|
| iterations_match = re.search(r"Total Iterations:\s*(\d+)", comp_log_str) |
| if iterations_match: |
| result["iterations"] = int(iterations_match.group(1)) |
|
|
| duration_match = re.search(r"Duration:\s*([0-9:.\s]+)", comp_log_str) |
| if duration_match: |
| duration_str = duration_match.group(1).strip() |
| try: |
| t = datetime.datetime.strptime(duration_str, "%H:%M:%S.%f") |
| except ValueError: |
| t = datetime.datetime.strptime(duration_str, "%H:%M:%S") |
| result["duration"] = t.hour * 3600 + t.minute * 60 + t.second + t.microsecond / 1e6 |
|
|
| api_calls_match = re.search(r"Total API Calls:\s*(\d+)", comp_log_str) |
| if api_calls_match: |
| result["API_calls"] = int(api_calls_match.group(1)) |
| input_tokens_match = re.search(r"Total Input Tokens:\s*(\d+)", comp_log_str) |
| if input_tokens_match: |
| result["input_tokens"] = int(input_tokens_match.group(1)) |
| output_tokens_match = re.search(r"Total Output Tokens:\s*(\d+)", comp_log_str) |
| if output_tokens_match: |
| result["output_tokens"] = int(output_tokens_match.group(1)) |
| |
| match = re.search(r"User Prompt Tokens:\s*(\d+)", comp_log_str) |
| if match: |
| result["user_prompt_tokens"] = int(match.group(1)) |
| match = re.search(r"Supervisor to Coder Tokens:\s*(\d+)", comp_log_str) |
| if match: |
| result["supervisor_to_coder_tokens"] = int(match.group(1)) |
| match = re.search(r"Coder Output Tokens:\s*(\d+)", comp_log_str) |
| if match: |
| result["coder_output_tokens"] = int(match.group(1)) |
| match = re.search(r"Feedback to Supervisor Tokens:\s*(\d+)", comp_log_str) |
| if match: |
| result["feedback_to_supervisor_tokens"] = int(match.group(1)) |
|
|
| |
| val_log_files = glob.glob(os.path.join(log_dir, "*validation.log")) |
| val_log_str = None |
| if val_log_files: |
| with open(val_log_files[0], "r") as f: |
| val_log_str = f.read() |
| matches = re.findall(r'(✅ Validation successful|❌ Validation failed)', val_log_str) |
| if not matches: |
| result["success"] = False |
| else: |
| last = matches[-1] |
| result["success"] = last == "✅ Validation successful" |
| if (no_llm): |
| if (result["success"]): |
| result["error"] = None |
| else: |
| result["error"] = "Validation Error" |
| val_log_str = val_log_str.replace('\n', '').replace('\r', '') |
| else: |
| result["success"] = False |
| val_log_str = "" |
| if (not no_llm): |
| try: |
| response = client.chat.completions.create( |
| model = model, |
| messages = [ |
| { |
| 'role': 'user', |
| 'content': error_categorization_prompt + |
| "\nComprehensive Log:\n" + comp_log_str + |
| "\nValidation Log:\n" + val_log_str |
| } |
| ], |
| ) |
| error_description = response.choices[-1].message.content |
| def parse_categories(llm_output): |
| |
| return [cat.strip() for cat in re.findall(r"\*\*\*(.*?)\*\*\*", llm_output)] |
| result["Error"] = parse_categories(error_description) |
| except Exception as e: |
| result["Error"] = "uncategorized" |
| print(error_description) |
| exit() |
| print(f"OpenAI API error: {e}") |
| else: |
| if ("API call failed" in comp_log_str): |
| result["error"] = "API Call Error" |
| else: |
| result["success"] = False |
| result["Error"] = "job submission failure" |
| results.append(result) |
|
|
| df = pd.DataFrame(results) |
| df = df.sort_values(by=["supervisor", "coder", "step", "output_dir"]) |
| df.to_csv(output_csv, index=False) |
| print(f"Results written to {output_csv}") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Summarize experiment logs and errors") |
| parser.add_argument("--results_dir", type=str, default=" ", nargs='+', required=False, help="One or more directories containing experiment results") |
| parser.add_argument("--output_csv", type=str, default="results_summary.csv", help="Path to output CSV file") |
| parser.add_argument("--model", type=str, default="gpt-oss-120b", help="LLM model to use for error summarization") |
| parser.add_argument("--no_llm", action="store_true", default=False, help="If set, only generate the CSV without LLM error description or categorization") |
| args = parser.parse_args() |
|
|
| summarize_results( |
| results_dirs=args.results_dir, |
| output_csv=args.output_csv, |
| model=args.model, |
| no_llm=args.no_llm |
| ) |
|
|
| if __name__ == "__main__": |
| main() |