| import os |
| import re |
| import sys |
| import openai |
| import subprocess |
| import argparse |
| import numpy as np |
| import yaml |
| from datetime import datetime |
|
|
|
|
| base_dir = os.getcwd() |
|
|
| |
| |
| project_root = os.path.dirname(base_dir) if 'results' in base_dir else base_dir |
|
|
|
|
| parser = argparse.ArgumentParser(description='supervisor/coder') |
| add_arg = parser.add_argument |
| add_arg('--prompt', help='prompt name') |
| add_arg('--out_dir', help='output directory') |
| add_arg('--input-files', nargs='*', help='List of input files from snakemake') |
| add_arg('--output-files', nargs='*', help='List of output files from snakemake') |
| add_arg('--config', help='Path to config file', default=os.path.join(project_root, 'config.yml')) |
| add_arg('--max_iterations', type=int, default=3, help='Maximum coder iterations for faster runs') |
| args = parser.parse_args() |
|
|
| |
| config_path = args.config |
| if os.path.exists(config_path): |
| try: |
| with open(config_path, 'r') as f: |
| config = yaml.safe_load(f) |
| supervisor = config.get('supervisor') |
| if not supervisor: |
| print(f"ERROR: 'supervisor' not found in {config_path}") |
| sys.exit(1) |
| coder = config.get('coder') |
| if not coder: |
| print(f"ERROR: 'coder' not found in {config_path}") |
| sys.exit(1) |
| except Exception as e: |
| print(f"ERROR: Could not read {config_path}: {e}") |
| sys.exit(1) |
| else: |
| print(f"ERROR: config file not found at {config_path}") |
| sys.exit(1) |
|
|
| temperature = config.get('temperature', None) |
|
|
| def get_code(output): |
| match = re.search(r'```python\s*(.*?)\s*```', output, re.DOTALL) |
| if match: |
| return match.group(1).strip() |
| return output |
|
|
| name = args.prompt |
| out_dir = args.out_dir |
| os.makedirs(out_dir, exist_ok=True) |
|
|
| log_dir = os.path.join(out_dir, 'logs') |
| os.makedirs(log_dir, exist_ok=True) |
| code_dir = os.path.join(out_dir, 'generated_code') |
| os.makedirs(code_dir, exist_ok=True) |
| prompt_pair_dir = os.path.join(out_dir, 'prompt_pairs') |
| os.makedirs(prompt_pair_dir, exist_ok=True) |
|
|
| prompt_filepath = os.path.join(out_dir, f'prompts_temp/{name}.txt') |
| |
| if not os.path.exists(prompt_filepath): |
| prompt_filepath = os.path.join(out_dir, f'logs/{name}.txt') |
| with open(prompt_filepath, 'r') as file: |
| user_prompt = file.read() |
|
|
| first_supervisor_instructions_filepath = os.path.join(out_dir, 'prompts_temp/supervisor_first_call.txt') |
| with open(first_supervisor_instructions_filepath, 'r') as file: |
| first_supervisor_instructions = file.read() |
|
|
| supervisor_instructions_filepath = os.path.join(out_dir, 'prompts_temp/supervisor_call.txt') |
| with open(supervisor_instructions_filepath, 'r') as file: |
| supervisor_instructions = file.read() |
|
|
| |
| file_context = '' |
| if args.input_files: |
| file_context += '\nInput files:\n' + '\n'.join(args.input_files) |
| if args.output_files: |
| file_context += '\nOutput files:\n' + '\n'.join(args.output_files) |
| supervisor_prompt = first_supervisor_instructions + file_context + '\n\n' + user_prompt |
|
|
| client = openai.OpenAI( |
| api_key = os.environ.get('CBORG_API_KEY'), |
| base_url = 'https://api.cborg.lbl.gov' |
| ) |
|
|
| done_outer = False |
| count_outer = 0 |
| total_calls = 0 |
| input_tokens = 0 |
| output_tokens = 0 |
| tokens = {"User Prompt": 0, "Supervisor to Coder": 0, "Coder Outputs": 0, "Feedback to Supervisor": 0} |
|
|
| |
| comprehensive_log_path = os.path.join(log_dir, f'{name}_comprehensive_log.txt') |
| start_time = datetime.now() |
|
|
| |
| with open(comprehensive_log_path, 'w') as comp_log: |
| comp_log.write("=" * 100 + "\n") |
| comp_log.write("π― COMPREHENSIVE SUPERVISOR-CODER LOG\n") |
| comp_log.write("=" * 100 + "\n\n") |
| comp_log.write(f"π Task: {name}\n") |
| comp_log.write(f"π€ Supervisor: {supervisor}\n") |
| comp_log.write(f"π€ Coder: {coder}\n") |
| comp_log.write(f"π Start Time: {start_time}\n") |
| comp_log.write(f"π Working Directory: {os.getcwd()}\n") |
| comp_log.write(f"π Output Directory: {out_dir}\n\n") |
| |
| |
| comp_log.write("π ORIGINAL USER PROMPT\n") |
| comp_log.write("-" * 50 + "\n") |
| comp_log.write(user_prompt) |
| comp_log.write("\n\n") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| comp_log.write("π PROCESS START\n") |
| comp_log.write("-" * 50 + "\n\n") |
|
|
| |
| def log_comprehensive(message, section="", level="INFO", plain=False): |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| with open(comprehensive_log_path, 'a') as comp_log: |
| if section: |
| |
| comp_log.write(f"\n\n{'='*25} {section.upper()} {'='*25}\n\n") |
| |
| |
| if len(message) > 50000: |
| truncated_message = message[:50000] + f"\n\n[TRUNCATED: message was {len(message)} characters long]" |
| message = truncated_message |
| |
| if plain: |
| |
| comp_log.write(f"{message}\n") |
| else: |
| comp_log.write(f"[{timestamp}] {level}: {message}\n") |
|
|
| log_comprehensive("Starting supervisor-coder interaction", "π INITIALIZATION") |
|
|
| print("=== SUPERVISOR-CODER MAIN LOOP ===") |
| log_comprehensive("=== MAIN LOOP STARTED ===", "π LOOP START") |
| print(f"π Calling supervisor {supervisor} (initial)") |
| log_comprehensive(f"Calling supervisor {supervisor} with initial prompt", "π€ SUPERVISOR CALL") |
| try: |
| response = client.chat.completions.create( |
| model = supervisor, |
| messages = [ |
| { |
| 'role': 'user', |
| 'content': supervisor_prompt |
| } |
| ], |
| temperature = temperature |
| ) |
| tokens['User Prompt'] += response.usage.prompt_tokens |
| input_tokens += response.usage.prompt_tokens |
| output_tokens += response.usage.completion_tokens |
| supervisor_response = response.choices[-1].message.content |
| |
| |
| print(f"Initial supervisor prompt tokens: {tokens['User Prompt']}") |
| log_comprehensive(f"Initial supervisor prompt tokens: {tokens['User Prompt']}", "TOKEN STATS") |
| |
| |
| log_comprehensive( |
| f"Supervisor response received ({response.usage.prompt_tokens} input tokens, {response.usage.completion_tokens} output tokens)", |
| "SUPERVISOR RESPONSE" |
| ) |
| log_comprehensive(supervisor_response, plain=True) |
| |
| log_comprehensive(file_context, section="", plain=True) |
| except Exception as e: |
| log_comprehensive(f"Supervisor API call failed: {e}", "β API ERROR", "ERROR") |
| print(f"OpenAI API error: {e}") |
| |
| if "429" in str(e) or "rate" in str(e).lower() or "quota" in str(e).lower(): |
| print("Rate limit or quota exceeded. This is a temporary issue.") |
| print("Stopping job to avoid wasting resources on temporary failures.") |
| log_comprehensive("Terminating due to rate limit/quota issue", "π TERMINATION", "ERROR") |
| sys.exit(1) |
| else: |
| print("Permanent API error (auth, network, etc.). Stopping job.") |
| log_comprehensive("Terminating due to permanent API error", "π TERMINATION", "ERROR") |
| sys.exit(1) |
|
|
| if 'Call record:' in supervisor_response: |
| coder_prompt, record = supervisor_response.split('Call record:', 1) |
| else: |
| coder_prompt = supervisor_response |
| record = "Supervisor did not provide a record." |
| |
| |
|
|
| supervisor_record = os.path.join(log_dir, 'supervisor_record.txt') |
| with open(supervisor_record, 'w') as f: |
| f.write('Call 1 record: ' + record) |
| f.write(f'WARNING: running supervisor/coder with supervisor {supervisor} and coder {coder}') |
|
|
| |
| done_inner = False |
| count_inner = 0 |
| old_coder_prompt = "Default coder prompt" |
| while not done_inner and count_inner < args.max_iterations: |
| count_inner += 1 |
| print(f"π€ Calling coder {coder} (attempt {count_inner}/{args.max_iterations})") |
| log_comprehensive(f"Calling coder {coder} (attempt {count_inner}/{args.max_iterations})", f"π€ CODER CALL #{count_inner}") |
| |
| try: |
| print("DEBUG: Making coder API call...") |
| response = client.chat.completions.create( |
| model = coder, |
| messages = [ |
| { |
| 'role': 'user', |
| 'content': coder_prompt |
| } |
| ], |
| temperature = temperature |
| ) |
| print("DEBUG: Coder API call completed successfully") |
| input_tokens += response.usage.prompt_tokens |
| output_tokens += response.usage.completion_tokens |
| tokens['Supervisor to Coder'] += response.usage.prompt_tokens |
| tokens['Coder Outputs'] += response.usage.completion_tokens |
| supervisor_response = response.choices[-1].message.content |
| output = response.choices[-1].message.content |
| print(f"DEBUG: Extracted output, type: {type(output)}, length: {len(output) if output else 0}") |
| log_comprehensive(f"Coder response received ({response.usage.prompt_tokens} input tokens, {response.usage.completion_tokens} output tokens)", f"CODER RESPONSE #{count_inner}") |
| log_comprehensive("Generated Code:", plain=True) |
| log_comprehensive(output, plain=True) |
| print("DEBUG: Logged coder response successfully") |
| except Exception as e: |
| log_comprehensive(f"Coder API call failed: {e}", f"β CODER ERROR #{count_inner}", "ERROR") |
| print(f"OpenAI API error: {e}") |
| if "429" in str(e) or "rate" in str(e).lower() or "quota" in str(e).lower(): |
| print("Rate limit or quota exceeded. This is a temporary issue.") |
| print("Stopping job to avoid wasting resources on temporary failures.") |
| log_comprehensive("Terminating due to rate limit/quota issue", "π TERMINATION", "ERROR") |
| sys.exit(1) |
| else: |
| print("Permanent API error (auth, network, etc.). Stopping job.") |
| log_comprehensive("Terminating due to permanent API error", "π TERMINATION", "ERROR") |
| sys.exit(1) |
| |
| code = get_code(output) |
| |
|
|
| code_filepath = os.path.join(code_dir, f'{name}.py') |
| with open(code_filepath, 'w') as f: |
| f.write(code) |
|
|
| print("β‘ Executing code from coder") |
| |
| |
| |
| cmd = ['python', code_filepath] |
|
|
| |
| |
| if args.input_files: |
| cmd.extend(['--input-files', *args.input_files]) |
| |
| if args.output_files: |
| cmd.extend(['--output-files', *args.output_files]) |
|
|
| |
| log_comprehensive("Executing generated code.", f"CODE EXECUTION #{count_inner}") |
| log_comprehensive(f"Command: {' '.join(cmd)}", level="DEBUG") |
|
|
| |
| env = os.environ.copy() |
| if out_dir: |
| env['OUTPUT_DIR'] = out_dir |
| process = subprocess.run( |
| cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| text=True, |
| env=env, |
| ) |
| |
| |
| log_comprehensive(f"Execution completed with exit code {process.returncode}", f"EXECUTION RESULT #{count_inner}") |
| |
| if process.returncode != 0: |
| log_comprehensive("Execution failed. Output:", level="ERROR") |
| log_comprehensive(process.stdout, plain=True) |
| else: |
| log_comprehensive("Execution successful. Output:", level="INFO") |
| log_comprehensive(process.stdout, plain=True) |
| |
| |
| command_line_output = process.stdout or "" |
|
|
| |
| if args.output_files: |
| for output_file in args.output_files: |
| if not os.path.exists(output_file): |
| error_message = f"\nERROR: Output file was not created: {output_file}" |
| print(error_message) |
| command_line_output += error_message |
| log_comprehensive(f"Output file not found after execution: {output_file}", f"EXECUTION RESULT #{count_inner}", "ERROR") |
| elif os.path.getsize(output_file) == 0: |
| error_message = f"\nERROR: Output file exists but is empty: {output_file}" |
| print(error_message) |
| command_line_output += error_message |
| log_comprehensive(f"Output file exists but is empty: {output_file}", f"EXECUTION RESULT #{count_inner}", "ERROR") |
| else: |
| |
| import time |
| mtime = os.path.getmtime(output_file) |
| current_time = time.time() |
| if current_time - mtime > 60: |
| error_message = f"\nWARNING: Output file exists but was not recently modified: {output_file} (modified {current_time - mtime:.1f} seconds ago)" |
| print(error_message) |
| command_line_output += error_message |
| log_comprehensive(f"Output file exists but not recently modified: {output_file}", f"EXECUTION RESULT #{count_inner}", "WARNING") |
|
|
| |
| with open(supervisor_record, 'r') as f: |
| record = f.read() |
|
|
| before_user, after_user = supervisor_instructions.split('User prompt:') |
| |
| file_context = '' |
| if args.input_files: |
| file_context += '\nInput files:\n' + '\n'.join(args.input_files) |
| if args.output_files: |
| file_context += '\nOutput files:\n' + '\n'.join(args.output_files) |
| new_supervisor_prompt = ( |
| before_user + file_context + '\nUser prompt:\n' + user_prompt + |
| '\nGenerated code:\n' + code + |
| '\nCommand line output:\n' + command_line_output + |
| '\nRecord:\n' + record |
| ) |
| |
| |
| print(f"Calling supervisor {supervisor} (iteration {count_inner})") |
| log_comprehensive(f"Calling supervisor {supervisor} for feedback (iteration {count_inner})", f"π€ SUPERVISOR FEEDBACK CALL #{count_inner}") |
| supervisor_response_inner = "" |
| try: |
| response = client.chat.completions.create( |
| model = supervisor, |
| messages = [ |
| { |
| 'role': 'user', |
| 'content': new_supervisor_prompt |
| } |
| ], |
| temperature = temperature |
| ) |
| input_tokens += response.usage.prompt_tokens |
| output_tokens += response.usage.completion_tokens |
| tokens['Feedback to Supervisor'] += response.usage.prompt_tokens |
| supervisor_response_inner = response.choices[-1].message.content |
| log_comprehensive(f"Supervisor feedback received ({response.usage.prompt_tokens} input tokens, {response.usage.completion_tokens} output tokens)", f"SUPERVISOR FEEDBACK RESPONSE #{count_inner}") |
| log_comprehensive("Supervisor Feedback:", plain=True) |
| log_comprehensive(supervisor_response_inner, plain=True) |
| except Exception as e: |
| log_comprehensive(f"Supervisor feedback API call failed: {e}", f"β SUPERVISOR ERROR #{count_inner}", "ERROR") |
| print(f"OpenAI API error: {e}") |
| if "429" in str(e) or "rate" in str(e).lower() or "quota" in str(e).lower(): |
| print("Rate limit or quota exceeded. This is a temporary issue.") |
| print("Stopping job to avoid wasting resources on temporary failures.") |
| log_comprehensive("Terminating due to rate limit/quota issue", "π TERMINATION", "ERROR") |
| sys.exit(1) |
| else: |
| print("Permanent API error (auth, network, etc.). Stopping job.") |
| log_comprehensive("Terminating due to permanent API error", "π TERMINATION", "ERROR") |
| sys.exit(1) |
| |
| |
| supervisor_response = supervisor_response_inner |
| |
| if 'Call record:' in supervisor_response: |
| try: |
| parts = supervisor_response.split('Call record:', 1) |
| coder_prompt = parts[0] |
| record = parts[-1] |
| with open(supervisor_record, 'a') as f: |
| f.write(f'\nCall {count_inner} record: ' + record) |
| except Exception as e: |
| log_comprehensive(f"Error parsing supervisor response: {e}", f"β οΈ PARSING WARNING #{count_inner}", "WARNING") |
| print(f"Error parsing supervisor response: {e}") |
| coder_prompt = supervisor_response |
| else: |
| coder_prompt = supervisor_response |
| |
| |
|
|
| old_coder_prompt = coder_prompt |
| if 'Supervisor is satisfied with current results' in coder_prompt: |
| done_inner = True |
| print(f"β
Supervisor satisfied after {count_inner} iterations!") |
| log_comprehensive(f"SUCCESS: Supervisor satisfied after {count_inner} iterations!", "π SUCCESS") |
| else: |
| log_comprehensive(f"Supervisor not satisfied, continuing to iteration {count_inner + 1}", f"π CONTINUING TO ITERATION #{count_inner + 1}") |
| |
| if 'Call record:' not in supervisor_response: |
| coder_prompt = supervisor_response |
|
|
| total_calls = 1 + 2*count_inner |
| print(f"π Total API calls made: {total_calls}") |
| log_comprehensive(f"Total API calls made: {total_calls}", "π FINAL STATISTICS") |
| log_comprehensive(f"Final token counts: {input_tokens} input, {output_tokens} output") |
|
|
| |
| end_time = datetime.now() |
| duration = end_time - start_time |
| log_comprehensive(f"Interaction completed in {duration}", "π PROCESS COMPLETED") |
| log_comprehensive("=== END OF COMPREHENSIVE LOG ===", "π END OF LOG") |
|
|
| |
| with open(comprehensive_log_path, 'a') as comp_log: |
| comp_log.write(f"\n{'='*100}\n") |
| comp_log.write("π FINAL SUMMARY\n") |
| comp_log.write(f"{'='*100}\n") |
| comp_log.write(f"Task: {name}\n") |
| comp_log.write(f"Supervisor: {supervisor}\n") |
| comp_log.write(f"Coder: {coder}\n") |
| comp_log.write(f"Total Iterations: {count_inner if 'count_inner' in locals() else 0}\n") |
| comp_log.write(f"Start Time: {start_time}\n") |
| comp_log.write(f"End Time: {end_time}\n") |
| comp_log.write(f"Duration: {duration}\n") |
| comp_log.write(f"Total API Calls: {total_calls}\n") |
| comp_log.write(f"Total Input Tokens: {input_tokens}\n") |
| comp_log.write(f"Total Output Tokens: {output_tokens}\n") |
| comp_log.write(f"User Prompt Tokens: {tokens['User Prompt']}\n") |
| comp_log.write(f"Supervisor to Coder Tokens: {tokens['Supervisor to Coder']}\n") |
| comp_log.write(f"Coder Output Tokens: {tokens['Coder Outputs']}\n") |
| comp_log.write(f"Feedback to Supervisor Tokens: {tokens['Feedback to Supervisor']}\n") |
| final_status = 'SUCCESS' if ('done_inner' in locals() and done_inner) else 'INCOMPLETE' |
| comp_log.write(f"Final Status: {final_status}\n") |
| comp_log.write(f"{'='*100}\n") |
|
|
| prompt_pair = f'User Prompt: \n {user_prompt} \n Supervisor Prompt: \n {old_coder_prompt}' |
| prompt_pair_path = os.path.join(prompt_pair_dir, f'{name}.txt') |
| with open(prompt_pair_path, 'w') as f: |
| f.write(prompt_pair) |
|
|
| |
| calls_filepath = os.path.join(out_dir, 'logs', 'calls.npy') |
| if os.path.exists(calls_filepath): |
| old_calls = np.load(calls_filepath) |
| new_calls = np.append(old_calls, total_calls) |
| else: |
| new_calls = np.array([total_calls]) |
| np.save(calls_filepath, new_calls) |
|
|
| input_tokens_filepath = os.path.join(out_dir, 'logs', 'input_tokens.npy') |
| if os.path.exists(input_tokens_filepath): |
| old_tokens = np.load(input_tokens_filepath) |
| new_tokens = np.append(old_tokens, input_tokens) |
| else: |
| new_tokens = np.array([input_tokens]) |
| np.save(input_tokens_filepath, new_tokens) |
|
|
| output_tokens_filepath = os.path.join(out_dir, 'logs', 'output_tokens.npy') |
| if os.path.exists(output_tokens_filepath): |
| old_tokens = np.load(output_tokens_filepath) |
| new_tokens = np.append(old_tokens, output_tokens) |
| else: |
| new_tokens = np.array([output_tokens]) |
| np.save(output_tokens_filepath, new_tokens) |
|
|
| |
| if args.output_files: |
| missing = [f for f in args.output_files if not os.path.exists(f) or os.path.getsize(f) == 0] |
| if missing: |
| print(f"β Missing or empty output files: {missing}") |
| sys.exit(1) |
| else: |
| print("β
All expected output files created.") |
|
|
| print("π Supervisor-coder interaction completed!") |
| print(f"π Final stats: {total_calls} total API calls, {input_tokens} input tokens, {output_tokens} output tokens") |
| print(f"π Comprehensive log saved to: {comprehensive_log_path}") |
|
|