| import os |
| import pandas as pd |
| import re |
| import glob |
| from tqdm import tqdm |
| import datetime |
| import openai |
| import argparse |
| import io |
|
|
| def summarize_results(results_dirs, output_csv, model, no_llm = False): |
| client = openai.OpenAI( |
| api_key = os.environ.get('CBORG_API_KEY'), |
| base_url = 'https://api.cborg.lbl.gov' |
| ) |
|
|
| error_description_prompt = ( |
| "You are an expert assistant. Below is a comprehensive log of a multi-step workflow from a high energy physics analysis framework.\n\n" |
| "The workflow includes:\n" |
| "- A user provides an analysis task prompt.\n" |
| "- A supervisor agent breaks down the task and instructs a coder agent.\n" |
| "- The coder agent generates code, which is executed.\n" |
| "- The supervisor reviews results and may iterate with the coder to fix issues until the task is complete.\n" |
| "The log contains the user prompt, supervisor/coder dialogue, code, and execution outputs for all iterations.\n\n" |
| "Your task: Summarize all errors encountered during the entire workflow in clear, concise language. " |
| "Do NOT repeat or quote the log, prompt, or instructions. " |
| "Do NOT include code, explanations, or any text except your error summary.\n\n" |
| "For each error, use the following structure:\n" |
| "- Error Type: [brief description of the nature of the error]\n" |
| "- Cause: [if identifiable]\n" |
| "- Responsible Party: [user, supervisor, coder, or external]\n" |
| "- Consequence: [result or impact]\n" |
| "- Context: [any important context]\n" |
| "- Workflow Response: [Did the supervisor diagnose and address it?" |
| "Did the coder attempt a fix? Was the fix successful, unsuccessful, or misdiagnosed?" |
| "Was the error ignored or did it persist? Summarize the recovery process and its outcome for each error.]\n" |
| "List each error as a separate bullet point using this template.\n" |
| "If there is a validation error, look in the validation log and use the same structure to identify the causes of the validation error." |
| "If no errors occurred, respond: 'No errors found.'\n" |
| "Do NOT include code, explanations, or any text except your error summary.\n" |
| "Limit your entire summary to 3000 characters. " |
| "If no errors occurred, respond: 'No errors found.'\n\n" |
| ) |
|
|
| results = [] |
| for results_dir in results_dirs: |
| for name in tqdm(os.listdir(results_dir), desc=f"generating error descriptions for {results_dir}"): |
| output_dir = os.path.join(results_dir, name) |
|
|
| if os.path.isdir(output_dir): |
| |
| config_match = re.match(r'^(.*?)_step\d+', name) |
| config = config_match.group(1) if config_match else None |
|
|
| |
| step_match = re.search(r'_step(\d+)', name) |
| step = int(step_match.group(1)) if step_match else None |
|
|
| result = { |
| "supervisor": None, |
| "coder": None, |
| "step": step, |
| "success": None, |
| "iterations": None, |
| "duration": None, |
| "API_calls": None, |
| "input_tokens": None, |
| "output_tokens": None, |
| "user_prompt_tokens": None, |
| "supervisor_to_coder_tokens": None, |
| "coder_output_tokens": None, |
| "feedback_to_supervisor_tokens": None, |
| "error": "Uncategorized", |
| "error_description": None, |
| "output_dir": output_dir, |
| } |
|
|
| log_dir = os.path.join(output_dir, "logs") |
| if os.path.isdir(log_dir): |
| comp_log_files = glob.glob(os.path.join(log_dir, "*comprehensive_log.txt")) |
| comp_log_str = None |
| if comp_log_files: |
| with open(comp_log_files[0], "r") as f: |
| comp_log_str = f.read() |
| else: |
| result["success"] = False |
| result["error_description"] = "comprehensive log file not found" |
| results.append(result) |
| continue |
|
|
| supervisor_match = re.search(r"Supervisor:\s*([^\s]+)", comp_log_str) |
| coder_match = re.search(r"Coder:\s*([^\s]+)", comp_log_str) |
| if supervisor_match: |
| result["supervisor"] = supervisor_match.group(1) |
| if coder_match: |
| result["coder"] = coder_match.group(1) |
|
|
| iterations_match = re.search(r"Total Iterations:\s*(\d+)", comp_log_str) |
| if iterations_match: |
| result["iterations"] = int(iterations_match.group(1)) |
|
|
| duration_match = re.search(r"Duration:\s*([0-9:.\s]+)", comp_log_str) |
| if duration_match: |
| duration_str = duration_match.group(1).strip() |
| try: |
| t = datetime.datetime.strptime(duration_str, "%H:%M:%S.%f") |
| except ValueError: |
| t = datetime.datetime.strptime(duration_str, "%H:%M:%S") |
| result["duration"] = t.hour * 3600 + t.minute * 60 + t.second + t.microsecond / 1e6 |
|
|
| api_calls_match = re.search(r"Total API Calls:\s*(\d+)", comp_log_str) |
| if api_calls_match: |
| result["API_calls"] = int(api_calls_match.group(1)) |
| input_tokens_match = re.search(r"Total Input Tokens:\s*(\d+)", comp_log_str) |
| if input_tokens_match: |
| result["input_tokens"] = int(input_tokens_match.group(1)) |
| output_tokens_match = re.search(r"Total Output Tokens:\s*(\d+)", comp_log_str) |
| if output_tokens_match: |
| result["output_tokens"] = int(output_tokens_match.group(1)) |
| |
| match = re.search(r"User Prompt Tokens:\s*(\d+)", comp_log_str) |
| if match: |
| result["user_prompt_tokens"] = int(match.group(1)) |
| match = re.search(r"Supervisor to Coder Tokens:\s*(\d+)", comp_log_str) |
| if match: |
| result["supervisor_to_coder_tokens"] = int(match.group(1)) |
| match = re.search(r"Coder Output Tokens:\s*(\d+)", comp_log_str) |
| if match: |
| result["coder_output_tokens"] = int(match.group(1)) |
| match = re.search(r"Feedback to Supervisor Tokens:\s*(\d+)", comp_log_str) |
| if match: |
| result["feedback_to_supervisor_tokens"] = int(match.group(1)) |
|
|
| |
| val_log_files = glob.glob(os.path.join(log_dir, "*validation.log")) |
| val_log_str = None |
| if val_log_files: |
| with open(val_log_files[0], "r") as f: |
| val_log_str = f.read() |
| matches = re.findall(r'(✅ Validation successful|❌ Validation failed)', val_log_str) |
| if not matches: |
| result["success"] = False |
| else: |
| last = matches[-1] |
| result["success"] = last == "✅ Validation successful" |
| if (no_llm): |
| if (result["success"]): |
| result["error"] = None |
| else: |
| result["error"] = "Validation Error" |
| val_log_str = val_log_str.replace('\n', '').replace('\r', '') |
| else: |
| result["success"] = False |
| val_log_str = "" |
| if (not no_llm): |
| try: |
| response = client.chat.completions.create( |
| model = model, |
| messages = [ |
| { |
| 'role': 'user', |
| 'content': error_description_prompt + |
| "\nComprehensive Log:\n" + comp_log_str + |
| "\nValidation Log:\n" + val_log_str |
| } |
| ], |
| temperature = 0.0 |
| ) |
| error_description = response.choices[-1].message.content |
| error_description = " ".join(error_description.split()) |
| error_description = error_description[:3000] |
| result["error_description"] = error_description |
| except Exception as e: |
| print(f"OpenAI API error: {e}") |
| else: |
| if ("API call failed" in comp_log_str): |
| result["error"] = "API Call Error" |
| else: |
| result["success"] = False |
| result["error_description"] = "job submission failure" |
| results.append(result) |
|
|
| df = pd.DataFrame(results) |
| df = df.sort_values(by=["supervisor", "coder", "step", "output_dir"]) |
| df.to_csv(output_csv, index=False) |
| print(f"Results written to {output_csv}") |
|
|
| def categorize_errors(output_csv, model): |
| |
| client = openai.OpenAI( |
| api_key = os.environ.get('CBORG_API_KEY'), |
| base_url = 'https://api.cborg.lbl.gov' |
| ) |
|
|
| |
| df = pd.read_csv(output_csv, comment='#') |
|
|
| |
| error_descriptions = df['error_description'].fillna("").tolist() |
|
|
| |
| create_categories_prompt = ( |
| "You are an expert at analyzing and organizing error messages from machine learning workflows in high energy physics.\n\n" |
| "Workflow summary:\n" |
| "- A user provides an analysis task prompt.\n" |
| "- A supervisor agent breaks down the task and instructs a coder agent.\n" |
| "- The coder agent generates code, which is executed.\n" |
| "- The supervisor reviews results and may iterate with the coder to fix issues until the task is complete.\n" |
| "Error descriptions below are collected from all steps and iterations of this workflow.\n\n" |
| "Your task: Identify 5 to 10 distinct, meaningful categories that best capture the underlying nature or root cause of the errors in the list. " |
| "Focus on grouping errors by what fundamentally caused them (such as logic mistakes, miscommunication, missing dependencies, data mismatches, etc.), " |
| "rather than by their symptoms, error messages, or observable effects. " |
| "Do NOT create categories based on how the error was observed or reported, but on the underlying issue that led to it.\n\n" |
| "Each category should have a short, clear name and a one-sentence description that explains what kinds of errors belong in that category.\n\n" |
| "Output only the categories in this format:\n" |
| "1. [Category Name]: [One-sentence description]\n" |
| "2. [Category Name]: [One-sentence description]\n" |
| "...\n" |
| "N. [Category Name]: [One-sentence description]\n\n" |
| "Here are some example error categories:\n" |
| "- Coding API Error: the coder incorrectly utilized common python packages (e.g. numpy, awkward, uproot, pandas)\n" |
| "- User Prompt Misunderstanding: the supervisor did not properly interpret the user prompt" |
| "Here are some error descriptions after running the workflow:\n" |
| "```\n" |
| ) |
| |
| create_categories_prompt += "\n".join(error_descriptions) + "\n```" |
|
|
| |
| try: |
| response = client.chat.completions.create( |
| model=model, |
| messages=[{'role': 'user', 'content': create_categories_prompt}], |
| temperature=0.0 |
| ) |
| error_categories = response.choices[-1].message.content.strip() |
| print("Categories found by LLM:\n", error_categories) |
| except Exception as e: |
| print(f"LLM API error (category generation): {e}") |
| return |
|
|
| df['error'] = df['error'].astype(str) |
|
|
| for idx, error_description in tqdm(enumerate(error_descriptions), total=len(error_descriptions), desc="categorizing errors"): |
| if not error_description.strip(): |
| continue |
|
|
| categorize_errors_prompt = ( |
| "You are an expert at classifying error messages from machine learning workflows in high energy physics.\n\n" |
| "Workflow summary:\n" |
| "- A user provides an analysis task prompt.\n" |
| "- A supervisor agent breaks down the task and instructs a coder agent.\n" |
| "- The coder agent generates code, which is executed.\n" |
| "- The supervisor reviews results and may iterate with the coder to fix issues until the task is complete.\n" |
| "The error descriptions below are collected from all steps and iterations of this workflow.\n\n" |
| "Below is a list of error categories, each with a short description:\n" |
| f"{error_categories}\n\n" |
| "Your task: For the given error description, select the single most appropriate error category from the list above. " |
| "Base your choice on the underlying nature or root cause of the error, not on the symptoms, error messages, or observable effects. " |
| "Focus on what fundamentally caused the error, such as logic mistakes, missing dependencies, data mismatches, or miscommunication, rather than how the error was reported or observed.\n" |
| "Return ALL applicable category names, each wrapped with three asterisks on each side, separated by commas, like this: ***Category One***, ***Category Two***" |
| "Do not include any other text, explanation, or formatting." |
| "Error description:\n" |
| "```\n" |
| f"{error_description}\n" |
| "```" |
| ) |
|
|
| def parse_categories(llm_output): |
| |
| return [cat.strip() for cat in re.findall(r"\*\*\*(.*?)\*\*\*", llm_output)] |
|
|
| try: |
| response = client.chat.completions.create( |
| model=model, |
| messages=[{'role': 'user', 'content': categorize_errors_prompt}], |
| temperature=0.0 |
| ) |
| assignments_text = response.choices[-1].message.content.strip() |
| categories = parse_categories(assignments_text) |
| df.at[idx, 'error_categories'] = categories if categories else ["Uncategorized"] |
| except Exception as e: |
| print(f"LLM API error (assignment) at row {idx}: {e}") |
| df.at[idx, 'error'] = "LLM API error" |
|
|
| df.to_csv(output_csv, index=False) |
|
|
| with open(output_csv, 'w', encoding='utf-8') as f: |
| f.write("# LLM Generated Error Categories:\n") |
| for line in error_categories.splitlines(): |
| f.write(f"# {line}\n") |
| f.write("\n") |
| df.to_csv(f, index=False) |
| print(f"Saved categorized errors to {output_csv}") |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Summarize experiment logs and errors") |
| parser.add_argument("--results_dir", type=str, default=" ", nargs='+', required=False, help="One or more directories containing experiment results") |
| parser.add_argument("--output_csv", type=str, default="results_summary.csv", help="Path to output CSV file") |
| parser.add_argument("--model", type=str, default="gpt-oss-120b", help="LLM model to use for error summarization") |
| parser.add_argument("--no_llm", action="store_true", default=False, help="If set, only generate the CSV without LLM error description or categorization") |
| args = parser.parse_args() |
|
|
| summarize_results( |
| results_dirs=args.results_dir, |
| output_csv=args.output_csv, |
| model=args.model, |
| no_llm=args.no_llm |
| ) |
|
|
| if not args.no_llm: |
| categorize_errors( |
| output_csv=args.output_csv, |
| model=args.model |
| ) |
| else: |
| print("LLM error description and categorization skipped (--no_llm set)") |
|
|
| if __name__ == "__main__": |
| main() |