| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import asyncio |
| import random |
| import re |
| import os |
| import csv |
| import time |
| import itertools |
| from typing import Callable, Dict, List |
| from datetime import datetime |
|
|
| import numpy as np |
| from tqdm.asyncio import tqdm as aio_tqdm |
| import matplotlib.pyplot as plt |
|
|
| from evoagentx.agents import CustomizeAgent |
| from evoagentx.benchmark.bigbenchhard import BIGBenchHard |
| from evoagentx.core.logging import logger |
| from evoagentx.models import OpenAILLMConfig |
| from evoagentx.optimizers.engine.base import BaseOptimizer |
| from evoagentx.optimizers.engine.registry import ParamRegistry |
|
|
|
|
| class EvopromptOptimizer(BaseOptimizer): |
| """ |
| Base class for evolutionary prompt optimization algorithms. |
| |
| This optimizer uses evolutionary algorithms to improve prompts in multi-agent workflows. |
| It supports both node-based and combination-based evolution strategies. |
| """ |
| def __init__(self, |
| registry: ParamRegistry, |
| program: Callable, |
| population_size: int, |
| iterations: int, |
| llm_config: OpenAILLMConfig, |
| concurrency_limit: int = 10, |
| combination_sample_size: int = None, |
| enable_logging: bool = True, |
| log_dir: str = None, |
| enable_early_stopping: bool = True, |
| early_stopping_patience: int = 3): |
| """ |
| Initialize the EvoPrompt optimizer. |
| |
| Args: |
| registry: Parameter registry for tracking prompt nodes |
| program: The program/workflow to optimize |
| population_size: Size of the evolution population |
| iterations: Number of evolution iterations |
| llm_config: Configuration for the LLM used in evolution |
| concurrency_limit: Maximum concurrent API calls |
| combination_sample_size: Sample size for combination evaluation |
| enable_logging: Whether to enable detailed logging |
| log_dir: Directory for saving logs |
| enable_early_stopping: Whether to enable early stopping |
| early_stopping_patience: Number of generations to wait before stopping |
| """ |
| super().__init__(registry=registry, program=program) |
|
|
| |
| self.population_size = population_size |
| self.iterations = iterations |
| self.llm_config = llm_config |
| self.semaphore = asyncio.Semaphore(concurrency_limit) |
| self.combination_sample_size = combination_sample_size |
|
|
| |
| self.enable_logging = enable_logging |
| self.log_dir_base = log_dir |
| self.log_dir = None |
|
|
| |
| self.enable_early_stopping = enable_early_stopping |
| self.early_stopping_patience = early_stopping_patience |
| self._best_score_so_far = -float('inf') |
| self._generations_without_improvement = 0 |
| |
| |
| self._eval_cache = {} |
| self.node_populations: Dict[str, List[str]] = {} |
| self.node_scores: Dict[str, List[float]] = {} |
| self.best_scores_per_gen: Dict[str, Dict[str, float]] = {} |
| self.avg_scores_per_gen: Dict[str, Dict[str, float]] = {} |
| self.best_combo_scores_per_gen: Dict[str, float] = {} |
| self.avg_combo_scores_per_gen: Dict[str, float] = {} |
| |
| |
| self.paraphrase_agent = CustomizeAgent( |
| name="ParaphraseAgent", |
| description="An agent that paraphrases a given instruction.", |
| prompt="""Task: Generate a semantically equivalent but differently worded version of the user-provided instruction. |
| |
| Now, please process the following instruction: |
| Input: {instruction} |
| |
| Please provide the paraphrased version in the following format: |
| |
| ## paraphrased_instruction |
| [Your paraphrased version here]""", |
| llm_config=self.llm_config, |
| inputs=[ |
| {"name": "instruction", "type": "string", "description": "The instruction to paraphrase."}, |
| ], |
| outputs=[ |
| {"name": "paraphrased_instruction", "type": "string", "description": "The paraphrased instruction."} |
| ], |
| parse_mode="title" |
| ) |
|
|
| def _setup_logging_directory(self, benchmark: BIGBenchHard): |
| """ |
| Set up logging directory for evolution tracking. |
| |
| Args: |
| benchmark: The benchmark instance containing task information |
| """ |
| if not self.enable_logging or self.log_dir: |
| return |
|
|
| task_name = benchmark.task if hasattr(benchmark, 'task') else 'unknown_task' |
|
|
| if self.log_dir_base is None: |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| algo_name = self.__class__.__name__.replace("Optimizer", "") |
| self.log_dir = f"node_evolution_logs_{algo_name}_{self.llm_config.model}_{task_name}_{timestamp}" |
| else: |
| self.log_dir = self.log_dir_base |
|
|
| os.makedirs(self.log_dir, exist_ok=True) |
| logger.info(f"Logging enabled. Log files will be saved to: {self.log_dir}") |
|
|
| def _log_generation_summary(self, generation: int, operation: str = "Evolution"): |
| """ |
| Log detailed summary of each generation's population and scores. |
| |
| Args: |
| generation: The current generation number |
| operation: Type of operation (Evolution, Initial, etc.) |
| """ |
| if not self.enable_logging: |
| return |
|
|
| filename = f"generation_{generation:02d}_{operation.lower()}.csv" |
| filepath = os.path.join(self.log_dir, filename) |
|
|
| with open(filepath, 'w', newline='', encoding='utf-8') as f: |
| writer = csv.writer(f) |
| writer.writerow(['Node_Name', 'Individual_ID', 'Prompt_Text', 'Fitness_Score', 'Status', 'Rank_in_Node', 'Generation', 'Timestamp']) |
| timestamp = datetime.now().isoformat() |
|
|
| for node_name in self.node_populations.keys(): |
| node_pop = self.node_populations.get(node_name, []) |
| node_scores = self.node_scores.get(node_name, []) |
|
|
| if not node_pop: |
| continue |
|
|
| sorted_indices = sorted(range(len(node_scores)), key=lambda i: node_scores[i], reverse=True) |
|
|
| for rank, idx in enumerate(sorted_indices, 1): |
| prompt = node_pop[idx] |
| score = node_scores[idx] |
| status = "Best" if rank == 1 else "Survivor" if rank <= self.population_size else "Eliminated" |
|
|
| writer.writerow([ |
| node_name, f"{node_name}_{idx}", prompt[:200] + "..." if len(prompt) > 200 else prompt, |
| f"{score:.6f}", status, rank, generation, timestamp |
| ]) |
|
|
| def _log_detailed_evaluation(self, generation: int, combinations: List[Dict[str, str]], |
| combination_scores: List[float]): |
| if not self.enable_logging: |
| return |
|
|
| filename = f"combo_evaluation_gen_{generation:02d}.csv" |
| filepath = os.path.join(self.log_dir, filename) |
|
|
| with open(filepath, 'w', newline='', encoding='utf-8') as f: |
| writer = csv.writer(f) |
| node_names = list(combinations[0].keys()) if combinations else [] |
| header = ['Combination_ID', 'Average_Score'] |
| for node_name in node_names: |
| header.append(f'{node_name}_Prompt_Preview') |
| header.extend(['Generation', 'Timestamp']) |
| writer.writerow(header) |
|
|
| timestamp = datetime.now().isoformat() |
|
|
| for combo_id, (combination, avg_score) in enumerate(zip(combinations, combination_scores)): |
| try: |
| row = [f"combo_{combo_id}", f"{avg_score:.6f}"] |
| for node_name in node_names: |
| prompt = combination[node_name] |
| row.append(prompt[:50] + "..." if len(prompt) > 50 else prompt) |
| row.extend([generation, timestamp]) |
| writer.writerow(row) |
| except Exception as e: |
| logger.error(f"Error logging evaluation for combination {combo_id}: {e}") |
|
|
| def _create_single_metric_plot(self, metric_name: str, generations: List[int], |
| best_scores: List[float], avg_scores: List[float], |
| algorithm_name: str, plot_dir: str): |
| fig, ax = plt.subplots(figsize=(12, 7)) |
| ax.plot(generations, best_scores, marker='o', linestyle='-', linewidth=2, markersize=8, label='Best Score') |
| ax.plot(generations, avg_scores, marker='x', linestyle='--', linewidth=2, markersize=8, label='Average Score') |
|
|
| title = f"Performance for '{metric_name}' ({algorithm_name})" |
| ax.set_title(title, fontsize=16, weight='bold') |
| ax.set_xlabel('Generation', fontsize=12) |
| ax.set_ylabel('Fitness Score', fontsize=12) |
| ax.set_xticks(generations) |
| ax.set_xticklabels([f"Gen {g}" for g in generations], rotation=45, ha="right") |
| ax.legend(loc='best', fontsize=10) |
| ax.grid(True, which='both', linestyle='--', linewidth=0.5) |
|
|
| plt.tight_layout() |
|
|
| safe_metric_name = re.sub(r'[^a-zA-Z0-9_-]', '_', metric_name) |
| filename = f"performance_plot_{safe_metric_name}.png" |
| filepath = os.path.join(plot_dir, filename) |
|
|
| try: |
| plt.savefig(filepath, dpi=200, bbox_inches='tight') |
| except Exception as e: |
| logger.error(f"Failed to save individual plot for {metric_name}: {e}") |
| finally: |
| plt.close(fig) |
|
|
| def _plot_and_save_performance_graph(self, algorithm_name: str): |
| if not self.enable_logging or plt is None: |
| if plt is None: |
| logger.warning("Matplotlib not found, skipping plot generation.") |
| return |
| if not self.best_scores_per_gen and not self.best_combo_scores_per_gen: |
| logger.warning("No performance data to plot.") |
| return |
|
|
| plt.style.use('seaborn-v0_8-whitegrid') |
| all_gen_keys = set(self.best_scores_per_gen.keys()) | set(self.best_combo_scores_per_gen.keys()) |
| generations = sorted([int(re.search(r'\d+', gen).group()) for gen in all_gen_keys if re.search(r'\d+', gen)]) |
|
|
| fig_combined, ax_combined = plt.subplots(figsize=(16, 9)) |
|
|
| if self.best_combo_scores_per_gen: |
| combo_best = [self.best_combo_scores_per_gen.get(f"Gen_{g}") for g in generations] |
| combo_avg = [self.avg_combo_scores_per_gen.get(f"Gen_{g}") for g in generations] |
| ax_combined.plot(generations, combo_best, marker='*', linestyle='-', linewidth=2.5, markersize=10, label='Best Combination Score (Overall)') |
| ax_combined.plot(generations, combo_avg, marker='D', linestyle='--', linewidth=2.5, markersize=8, label='Average Combination Score (Overall)') |
|
|
| all_node_metrics = set() |
| for gen_data in self.best_scores_per_gen.values(): |
| all_node_metrics.update(gen_data.keys()) |
|
|
| for metric in sorted(list(all_node_metrics)): |
| best_scores = [self.best_scores_per_gen.get(f"Gen_{g}", {}).get(metric) for g in generations] |
| avg_scores = [self.avg_scores_per_gen.get(f"Gen_{g}", {}).get(metric) for g in generations] |
| ax_combined.plot(generations, best_scores, marker='o', linestyle='-', alpha=0.7, label=f'Best Score ({metric})') |
| ax_combined.plot(generations, avg_scores, marker='x', linestyle='--', alpha=0.7, label=f'Average Score ({metric})') |
|
|
| ax_combined.set_title(f'Overall Performance Evolution ({algorithm_name})', fontsize=18, weight='bold') |
| ax_combined.set_xlabel('Generation', fontsize=14) |
| ax_combined.set_ylabel('Fitness Score', fontsize=14) |
| ax_combined.set_xticks(generations) |
| ax_combined.set_xticklabels([f"Gen {g}" for g in generations], rotation=45, ha="right") |
| handles, labels = ax_combined.get_legend_handles_labels() |
| combo_indices = [i for i, label in enumerate(labels) if 'Combination' in label] |
| node_indices = [i for i, label in enumerate(labels) if 'Combination' not in label] |
| ax_combined.legend([handles[i] for i in combo_indices + node_indices], |
| [labels[i] for i in combo_indices + node_indices], |
| loc='best', fontsize=10) |
| ax_combined.grid(True, which='both', linestyle='--', linewidth=0.5) |
| plt.tight_layout() |
|
|
| combined_filepath = os.path.join(self.log_dir, "performance_summary_OVERALL.png") |
| try: |
| plt.savefig(combined_filepath, dpi=300, bbox_inches='tight') |
| logger.info(f"Overall performance plot saved to: {combined_filepath}") |
| except Exception as e: |
| logger.error(f"Failed to save overall performance plot: {e}") |
| finally: |
| plt.close(fig_combined) |
|
|
| individual_plot_dir = os.path.join(self.log_dir, 'individual_plots') |
| os.makedirs(individual_plot_dir, exist_ok=True) |
|
|
| for metric in sorted(list(all_node_metrics)): |
| best_scores = [self.best_scores_per_gen.get(f"Gen_{g}", {}).get(metric) for g in generations] |
| avg_scores = [self.avg_scores_per_gen.get(f"Gen_{g}", {}).get(metric) for g in generations] |
| self._create_single_metric_plot(metric, generations, best_scores, avg_scores, algorithm_name, individual_plot_dir) |
|
|
| if self.best_combo_scores_per_gen: |
| combo_best = [self.best_combo_scores_per_gen.get(f"Gen_{g}") for g in generations] |
| combo_avg = [self.avg_combo_scores_per_gen.get(f"Gen_{g}") for g in generations] |
| self._create_single_metric_plot("Combination", generations, combo_best, combo_avg, algorithm_name, individual_plot_dir) |
|
|
| logger.info(f"Individual performance plots saved to: {individual_plot_dir}") |
|
|
| def _log_optimization_summary(self, algorithm_name: str, best_config: Dict[str, str], |
| test_accuracy: float = None): |
| if not self.enable_logging: |
| return |
|
|
| filename = f"optimization_summary_{algorithm_name.lower()}.csv" |
| filepath = os.path.join(self.log_dir, filename) |
|
|
| with open(filepath, 'w', newline='', encoding='utf-8') as f: |
| writer = csv.writer(f) |
| writer.writerow(['Metric', 'Value', 'Timestamp']) |
| timestamp = datetime.now().isoformat() |
|
|
| writer.writerow(['Algorithm', algorithm_name, timestamp]) |
| writer.writerow(['Population_Size', self.population_size, timestamp]) |
| writer.writerow(['Iterations', self.iterations, timestamp]) |
| writer.writerow(['Combination_Sample_Size', self.combination_sample_size, timestamp]) |
| writer.writerow(['Early_Stopping_Enabled', self.enable_early_stopping, timestamp]) |
| if self.enable_early_stopping: |
| writer.writerow(['Early_Stopping_Patience', self.early_stopping_patience, timestamp]) |
|
|
| if test_accuracy is not None: |
| writer.writerow(['Final_Test_Accuracy', f"{test_accuracy:.6f}", timestamp]) |
|
|
| for node_name, prompt in best_config.items(): |
| writer.writerow([f'Best_{node_name}', prompt, timestamp]) |
|
|
| for gen_name in self.best_scores_per_gen.keys(): |
| for metric_name, best_score in self.best_scores_per_gen[gen_name].items(): |
| writer.writerow([f'{gen_name}_{metric_name}_Best', f"{best_score:.6f}", timestamp]) |
|
|
| if gen_name in self.avg_scores_per_gen: |
| for metric_name, avg_score in self.avg_scores_per_gen[gen_name].items(): |
| writer.writerow([f'{gen_name}_{metric_name}_Avg', f"{avg_score:.6f}", timestamp]) |
|
|
| self._plot_and_save_performance_graph(algorithm_name) |
| |
| async def _log_evaluation_details(self, benchmark: BIGBenchHard, dataset: List[Dict], |
| predictions: List[str], scores: List[float], eval_mode: str, |
| accuracy: float, correct_count: int, total_count: int): |
| if not self.enable_logging: |
| return |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| filename = f"evaluation_testset_{eval_mode}_{timestamp}.csv" |
| filepath = os.path.join(self.log_dir, filename) |
| |
| logger.info(f"Logging detailed evaluation results to {filepath}") |
| |
| with open(filepath, 'w', newline='', encoding='utf-8') as f: |
| writer = csv.writer(f) |
| |
| writer.writerow(['Metric', 'Value']) |
| writer.writerow(['Overall_Accuracy', f"{accuracy:.6f}"]) |
| writer.writerow(['Correct_Count', correct_count]) |
| writer.writerow(['Total_Count', total_count]) |
| writer.writerow([]) |
|
|
| |
| writer.writerow(['example_id', 'input_text', 'prediction', 'ground_truth', 'score']) |
| |
| for i, example in enumerate(dataset): |
| example_id = benchmark._get_id(example) |
| input_text = example.get("input", "") |
| label = benchmark.get_label(example) |
| |
| writer.writerow([ |
| example_id, |
| input_text[:200] + "..." if len(input_text) > 200 else input_text, |
| predictions[i], |
| label, |
| scores[i] |
| ]) |
|
|
| def _log_generation(self, generation: int, combos_with_scores: List[tuple]): |
| """ |
| Log generation data for combination-based evolution. |
| """ |
| if not self.enable_logging: |
| return |
| |
| filename = f"combo_generation_{generation:02d}_log.csv" |
| filepath = os.path.join(self.log_dir, filename) |
| |
| with open(filepath, 'w', newline='', encoding='utf-8') as f: |
| writer = csv.writer(f) |
| header = ['Combination_ID', 'Combination_Score', 'Node_Name', 'Prompt_Text', 'Generation', 'Timestamp'] |
| writer.writerow(header) |
| timestamp = datetime.now().isoformat() |
| |
| sorted_combos = sorted(combos_with_scores, key=lambda x: x[1], reverse=True) |
| |
| for combo_rank, (combination, avg_score) in enumerate(sorted_combos): |
| combo_id = f"combo_rank_{combo_rank + 1}" |
| for node_name, prompt_text in combination.items(): |
| writer.writerow([ |
| combo_id, |
| f"{avg_score:.6f}", |
| node_name, |
| prompt_text[:200] + "..." if len(prompt_text) > 200 else prompt_text, |
| generation, |
| timestamp |
| ]) |
|
|
| async def _evaluate_combination_list(self, combinations: List[Dict], benchmark: BIGBenchHard, dev_set: list) -> List[float]: |
| if not combinations: |
| return [] |
| eval_dev_set = dev_set[:50] if len(dev_set) > 50 else dev_set |
| all_scores = [] |
| pbar = aio_tqdm(total=len(combinations), desc="Evaluating batch", leave=False) |
| for combo in combinations: |
| tasks = [self._evaluate_combination_on_example(combo, benchmark, ex) for ex in eval_dev_set] |
| example_scores = await asyncio.gather(*tasks) |
| avg_score = sum(example_scores) / len(example_scores) if example_scores else 0.0 |
| all_scores.append(avg_score) |
| pbar.update(1) |
| pbar.close() |
| return all_scores |
|
|
| def _generate_combinations(self, node_populations: Dict[str, List[str]]) -> List[Dict[str, str]]: |
| node_names = list(node_populations.keys()) |
| node_prompts = [node_populations[node] for node in node_names] |
| total_possible = np.prod([len(p) for p in node_prompts if p]) if all(p for p in node_prompts) else 0 |
|
|
| if total_possible == 0: |
| logger.warning("Cannot generate combinations, one or more node populations are empty.") |
| return [] |
|
|
| if self.combination_sample_size is None: |
| target_size = min(self.population_size, int(total_possible), 200) |
| else: |
| target_size = min(self.combination_sample_size, int(total_possible)) |
|
|
| logger.info(f"Total possible combinations: {total_possible}, sampling: {target_size}") |
|
|
| if target_size >= total_possible: |
| all_combinations = [] |
| for combination in itertools.product(*node_prompts): |
| combo_dict = {node_names[i]: combination[i] for i in range(len(node_names))} |
| all_combinations.append(combo_dict) |
| return all_combinations |
|
|
| sampled_combinations = [] |
| sampled_keys = set() |
| max_attempts = target_size * 5 |
| attempts = 0 |
|
|
| while len(sampled_combinations) < target_size and attempts < max_attempts: |
| combination = {name: random.choice(prompts) for name, prompts in node_populations.items()} |
| combo_key = tuple(sorted(combination.items())) |
| if combo_key not in sampled_keys: |
| sampled_combinations.append(combination) |
| sampled_keys.add(combo_key) |
| attempts += 1 |
|
|
| logger.info(f"Generated {len(sampled_combinations)} unique combinations") |
| return sampled_combinations |
|
|
| async def _evaluate_combination_on_example(self, combination: Dict[str, str], |
| benchmark: BIGBenchHard, example: Dict) -> float: |
| combo_key = tuple(sorted(combination.items())) |
| example_key = str(hash(str(example))) |
| cache_key = hash((combo_key, example_key)) |
|
|
| if not hasattr(self, '_eval_cache'): |
| self._eval_cache = {} |
|
|
| if cache_key in self._eval_cache: |
| return self._eval_cache[cache_key] |
|
|
| async with self.semaphore: |
| try: |
| original_config = self.get_current_cfg() |
| self.apply_cfg(combination) |
| inputs = {k: v for k, v in example.items() if k in benchmark.get_input_keys()} |
| prediction, _ = await asyncio.to_thread(self.program, **inputs) |
| label = benchmark.get_label(example) |
| score_dict = benchmark.evaluate(prediction, label) |
| score = score_dict.get("em", 0.0) |
| self.apply_cfg(original_config) |
| self._eval_cache[cache_key] = score |
| if len(self._eval_cache) > 5000: |
| keys_to_del = list(self._eval_cache.keys())[:1000] |
| for key in keys_to_del: |
| del self._eval_cache[key] |
| return score |
| except Exception as e: |
| logger.error(f"Error evaluating combination: {e}") |
| return 0.0 |
|
|
| async def _evaluate_combinations_and_update_node_scores(self, combinations: List[Dict[str, str]], |
| benchmark: BIGBenchHard, dev_set: list) -> List[float]: |
| eval_dev_set = dev_set[:50] if len(dev_set) > 50 else dev_set |
| combination_scores = [] |
| print(f"Evaluating {len(combinations)} combinations on {len(eval_dev_set)} examples...") |
| combo_pbar = aio_tqdm(total=len(combinations), desc="Evaluating Combinations") |
| for combination in combinations: |
| tasks = [self._evaluate_combination_on_example(combination, benchmark, ex) for ex in eval_dev_set] |
| example_scores = await asyncio.gather(*tasks) |
| avg_score = sum(example_scores) / len(example_scores) if example_scores else 0.0 |
| combination_scores.append(avg_score) |
| combo_pbar.update(1) |
| combo_pbar.close() |
|
|
| for node_name in self.node_populations.keys(): |
| self.node_scores[node_name] = [0.0] * len(self.node_populations[node_name]) |
| for prompt_idx, prompt in enumerate(self.node_populations[node_name]): |
| participating_scores = [ |
| combo_score for combo_idx, combo_score in enumerate(combination_scores) |
| if combinations[combo_idx].get(node_name) == prompt |
| ] |
| if participating_scores: |
| self.node_scores[node_name][prompt_idx] = sum(participating_scores) / len(participating_scores) |
| else: |
| self.node_scores[node_name][prompt_idx] = 0.0 |
| return combination_scores |
|
|
| async def _perform_paraphrase(self, prompt: str) -> str: |
| async with self.semaphore: |
| output = await asyncio.to_thread( |
| self.paraphrase_agent, |
| inputs={"instruction": prompt} |
| ) |
| return output.content.paraphrased_instruction.strip() |
|
|
| async def _perform_evolution(self, agent: Callable, inputs: Dict[str, str]) -> str: |
| async with self.semaphore: |
| output = await asyncio.to_thread(agent, inputs=inputs) |
| if hasattr(output.content, 'evolved_prompt'): |
| return output.content.evolved_prompt.strip() |
| return str(output.content).strip() |
|
|
| async def _initialize_node_populations(self, initial_config: Dict[str, any]): |
| for node_name, initial_value in initial_config.items(): |
| node_population = [] |
| if isinstance(initial_value, list): |
| provided_size = len(initial_value) |
| if self.population_size < provided_size: |
| logger.info(f"Node '{node_name}': Provided population ({provided_size}) is larger than target size ({self.population_size}). Randomly sampling.") |
| node_population = random.sample(initial_value, self.population_size) |
| elif self.population_size == provided_size: |
| logger.info(f"Node '{node_name}': Provided population size ({provided_size}) matches target size. Using directly.") |
| node_population = list(initial_value) |
| else: |
| logger.info(f"Node '{node_name}': Target population size ({self.population_size}) is larger than provided ({provided_size}). Expanding.") |
| node_population = list(initial_value) |
| num_to_generate = self.population_size - provided_size |
| source_prompts_for_generation = random.choices(initial_value, k=num_to_generate) |
| paraphrase_tasks = [self._perform_paraphrase(prompt) for prompt in source_prompts_for_generation] |
| new_prompts = await aio_tqdm.gather( |
| *paraphrase_tasks, desc=f"Expanding population for {node_name}" |
| ) |
| node_population.extend(new_prompts) |
| elif isinstance(initial_value, str): |
| logger.info(f"Node '{node_name}': Generating population from a single initial prompt.") |
| node_population = [initial_value] |
| if self.population_size > 1: |
| num_to_generate = self.population_size - 1 |
| paraphrase_tasks = [self._perform_paraphrase(initial_value) for _ in range(num_to_generate)] |
| new_prompts = await aio_tqdm.gather( |
| *paraphrase_tasks, desc=f"Generating initial population for {node_name}" |
| ) |
| node_population.extend(new_prompts) |
| else: |
| raise TypeError(f"Unsupported type for tracked parameter '{node_name}': {type(initial_value)}. Must be str or list.") |
| self.node_populations[node_name] = node_population |
| self.node_scores[node_name] = [0.0] * self.population_size |
|
|
| async def evaluate(self, benchmark: BIGBenchHard, eval_mode: str = "test") -> Dict[str, float]: |
| """ |
| Evaluates the optimized program on a specified dataset. |
| |
| Args: |
| benchmark (BIGBenchHard): The benchmark instance containing the data. |
| eval_mode (str): The evaluation mode, either "test" or "dev". |
| |
| Returns: |
| Dict[str, float]: A dictionary containing evaluation metrics. |
| """ |
| logger.info(f"--- Evaluating optimized program on '{eval_mode}' set ---") |
| |
| dataset = benchmark.get_test_data() if eval_mode == "test" else benchmark.get_dev_data() |
| if not dataset: |
| logger.warning(f"No data found for '{eval_mode}' set. Returning empty results.") |
| return {} |
|
|
| async def evaluate_example(example: Dict) -> tuple[float, str]: |
| prediction, _ = await asyncio.to_thread(self.program, input=example["input"]) |
| score_dict = benchmark.evaluate(prediction, benchmark.get_label(example)) |
| score = score_dict.get("em", 0.0) |
| return score, prediction |
|
|
| tasks = [evaluate_example(ex) for ex in dataset] |
| results = await aio_tqdm.gather(*tasks, desc=f"Evaluating on {eval_mode.capitalize()} Set") |
| |
| scores, predictions = zip(*results) if results else ([], []) |
| |
| correct_count = sum(scores) |
| total_count = len(dataset) |
| accuracy = correct_count / total_count if total_count > 0 else 0.0 |
| |
| logger.info(f"{eval_mode.capitalize()} Set Accuracy: {accuracy:.4f} ({int(correct_count)}/{total_count})") |
| |
| if self.enable_logging: |
| await self._log_evaluation_details( |
| benchmark, dataset, predictions, scores, eval_mode, |
| accuracy, int(correct_count), total_count |
| ) |
| |
| return {"accuracy": accuracy} |
|
|
| |
|
|
| class GAOptimizer(EvopromptOptimizer): |
| """ |
| Genetic Algorithm optimizer for prompt evolution. |
| |
| This optimizer uses genetic algorithm operations (crossover, mutation, selection) |
| to evolve prompts. It supports both node-based and combination-based evolution. |
| """ |
| |
| def __init__(self, *args, full_evaluation: bool = False, **kwargs): |
| """ |
| Initialize the GA optimizer. |
| |
| Args: |
| full_evaluation: Whether to use full node-based evaluation or combination-based |
| *args: Arguments passed to parent class |
| **kwargs: Keyword arguments passed to parent class |
| """ |
| super().__init__(*args, **kwargs) |
| self.full_evaluation = full_evaluation |
| |
| |
| mode_str = "full_evaluation" if self.full_evaluation else "combination-based" |
| logger.info(f"GAOptimizer initialized with '{mode_str}' mode.") |
| |
| |
| self.ga_agent = CustomizeAgent( |
| name="ga_agent", |
| description="An agent that evolves a new prompt from two parent prompts.", |
| prompt="""Please follow the instructions step-by-step to generate a better prompt. |
| |
| 1. Crossover the following prompts to generate a new prompt: |
| Prompt 1: {parent1} |
| Prompt 2: {parent2} |
| |
| 2. Mutate the prompt generated in Step 1 and generate a final evolved prompt. Strictly preserve the original XML tags structure. |
| |
| Now process the given prompts and provide your output in the following format: |
| |
| ## evolved_prompt |
| [Your evolved prompt here]""", |
| llm_config=self.llm_config, |
| inputs=[ |
| {"name": "parent1", "type": "string", "description": "The first parent prompt."}, |
| {"name": "parent2", "type": "string", "description": "The second parent prompt."} |
| ], |
| outputs=[ |
| {"name": "evolved_prompt", "type": "string", "description": "The evolved prompt with XML tags preserved."} |
| ], |
| parse_mode="title" |
| ) |
|
|
| async def _perform_node_evolution(self, node_name: str, node_population: List[str], |
| node_scores: List[float] = None, |
| evolution_agent: Callable = None) -> List[str]: |
| probabilities = None |
| if node_scores: |
| total_fitness = sum(node_scores) |
| if total_fitness > 0: |
| probabilities = [s / total_fitness for s in node_scores] |
|
|
| agent = evolution_agent or self.ga_agent |
| |
| num_children_to_create = len(node_population) |
| evolution_tasks = [] |
| for _ in range(num_children_to_create): |
| parents = random.choices(node_population, weights=probabilities, k=2) if probabilities else random.choices(node_population, k=2) |
| task = self._perform_evolution(agent=agent, inputs={"parent1": parents[0], "parent2": parents[1]}) |
| evolution_tasks.append(task) |
| |
| new_children = await aio_tqdm.gather(*evolution_tasks, desc=f"Evolving {node_name}") |
| return new_children |
|
|
| async def optimize(self, benchmark: BIGBenchHard) -> tuple[Dict[str, str], dict, dict]: |
| self._setup_logging_directory(benchmark) |
| initial_config = self.get_current_cfg() |
| if not initial_config: |
| raise ValueError("Registry is empty.") |
| await self._initialize_node_populations(initial_config) |
| dev_set = benchmark.get_dev_data() |
| if not dev_set: |
| raise ValueError("Benchmark has no development set.") |
| |
| self._best_score_so_far = -float('inf') |
| self._generations_without_improvement = 0 |
|
|
| if self.full_evaluation: |
| logger.info("--- Starting Node-Based Evolution with Makeup Evaluation (full_evaluation=True) ---") |
| |
| print("--- Step 1: Initial evaluation of node combinations... ---") |
| combinations = self._generate_combinations(self.node_populations) |
| combination_scores = await self._evaluate_combinations_and_update_node_scores(combinations, benchmark, dev_set) |
| |
| self._log_generation_summary(0, "Initial") |
| self._log_detailed_evaluation(0, combinations, combination_scores) |
| |
| self.best_scores_per_gen["Gen_0"] = {name: max(scores) if scores else 0 for name, scores in self.node_scores.items()} |
| self.avg_scores_per_gen["Gen_0"] = {name: np.mean(scores) if scores else 0 for name, scores in self.node_scores.items()} |
| |
| if combination_scores: |
| initial_best_combo_score = max(combination_scores) |
| self._best_score_so_far = initial_best_combo_score |
| self.best_combo_scores_per_gen["Gen_0"] = initial_best_combo_score |
| self.avg_combo_scores_per_gen["Gen_0"] = np.mean(combination_scores) |
| logger.info(f"Early stopping baseline set to initial best combination score: {self._best_score_so_far:.4f}") |
|
|
| for t in range(self.iterations): |
| generation_start_time = time.time() |
| print(f"\n--- Generation {t + 1}/{self.iterations} ---") |
|
|
| children_populations = {} |
| for node_name in self.node_populations.keys(): |
| children = await self._perform_node_evolution( |
| node_name, self.node_populations[node_name], self.node_scores[node_name], self.ga_agent |
| ) |
| children_populations[node_name] = children |
| |
| current_populations = { |
| name: self.node_populations[name] + children_populations[name] |
| for name in self.node_populations.keys() |
| } |
| self.node_populations = current_populations |
|
|
| print(f"Performing main evaluation for {len(list(current_populations.values())[0])} individuals in each node...") |
| combinations = self._generate_combinations(self.node_populations) |
| combination_scores = await self._evaluate_combinations_and_update_node_scores(combinations, benchmark, dev_set) |
| |
| prompts_needing_makeup = [] |
| for node_name, scores in self.node_scores.items(): |
| for idx, score in enumerate(scores): |
| if score == 0.0: |
| prompt_to_check = self.node_populations[node_name][idx] |
| is_in_combos = any(c.get(node_name) == prompt_to_check for c in combinations) |
| if not is_in_combos: |
| prompts_needing_makeup.append((node_name, idx, prompt_to_check)) |
| |
| if prompts_needing_makeup: |
| print(f"--- Performing makeup evaluation for {len(prompts_needing_makeup)} unsampled individuals... ---") |
| makeup_combinations = [] |
| for node_name, idx, prompt in prompts_needing_makeup: |
| makeup_combo = {name: random.choice(pop) for name, pop in self.node_populations.items()} |
| makeup_combo[node_name] = prompt |
| makeup_combinations.append(makeup_combo) |
| |
| makeup_scores = await self._evaluate_combination_list(makeup_combinations, benchmark, dev_set) |
|
|
| for i, (node_name, idx, prompt) in enumerate(prompts_needing_makeup): |
| self.node_scores[node_name][idx] = makeup_scores[i] |
| logger.info(f"Updated score for '{prompt[:30]}...' to {makeup_scores[i]:.4f} after makeup eval.") |
|
|
| print("--- Selecting survivors for the next generation... ---") |
| survivor_populations = {} |
| survivor_scores = {} |
| for node_name in self.node_populations.keys(): |
| population = self.node_populations[node_name] |
| scores = self.node_scores[node_name] |
| sorted_pairs = sorted(zip(scores, population), key=lambda x: x[0], reverse=True) |
| selected_pairs = sorted_pairs[:self.population_size] |
| |
| if selected_pairs: |
| selected_scores, selected_population = zip(*selected_pairs) |
| survivor_scores[node_name] = list(selected_scores) |
| survivor_populations[node_name] = list(selected_population) |
| else: |
| survivor_scores[node_name], survivor_populations[node_name] = [], [] |
| print(f"Node {node_name}: Selected top {len(survivor_populations[node_name])} from {len(population)} individuals") |
| |
| self.node_populations = survivor_populations |
| self.node_scores = survivor_scores |
|
|
| generation_time = time.time() - generation_start_time |
| print(f"Generation {t + 1} completed in {generation_time:.2f}s") |
| self._log_generation_summary(t + 1, "Evolution") |
| if combination_scores: |
| self._log_detailed_evaluation(t + 1, combinations, combination_scores) |
| |
| gen_name = f"Gen_{t + 1}" |
| self.best_scores_per_gen[gen_name] = {name: max(scores) if scores else 0 for name, scores in self.node_scores.items()} |
| self.avg_scores_per_gen[gen_name] = {name: np.mean(scores) if scores else 0 for name, scores in self.node_scores.items()} |
| |
| best_combo_score_this_gen = max(combination_scores) if combination_scores else -float('inf') |
| self.best_combo_scores_per_gen[gen_name] = best_combo_score_this_gen |
| self.avg_combo_scores_per_gen[gen_name] = np.mean(combination_scores) if combination_scores else 0.0 |
| |
| if self.enable_early_stopping: |
| if best_combo_score_this_gen > self._best_score_so_far + 1e-6: |
| self._best_score_so_far = best_combo_score_this_gen |
| self._generations_without_improvement = 0 |
| logger.info(f"Early stopping: New best combination score found: {self._best_score_so_far:.4f}.") |
| else: |
| self._generations_without_improvement += 1 |
| logger.info(f"Early stopping: No improvement for {self._generations_without_improvement} generation(s).") |
| |
| if self._generations_without_improvement >= self.early_stopping_patience: |
| logger.warning(f"\n--- EARLY STOPPING TRIGGERED at generation {t + 1} ---") |
| break |
|
|
| else: |
| logger.info("--- Starting Combo-Based Evolution (full_evaluation=False) ---") |
| |
| print("--- Step 1: Creating and evaluating initial combination population... ---") |
| initial_combinations = self._generate_combinations(self.node_populations) |
| initial_scores = await self._evaluate_combination_list(initial_combinations, benchmark, dev_set) |
| |
| combo_population_with_scores = sorted(zip(initial_combinations, initial_scores), key=lambda x: x[1], reverse=True) |
| combo_population_with_scores = combo_population_with_scores[:self.population_size] |
| |
| gen_0_scores = [score for _, score in combo_population_with_scores] |
| if gen_0_scores: |
| best_gen_score = max(gen_0_scores) |
| avg_gen_score = np.mean(gen_0_scores) |
| self.best_combo_scores_per_gen["Gen_0"] = best_gen_score |
| self.avg_combo_scores_per_gen["Gen_0"] = avg_gen_score |
| self._best_score_so_far = best_gen_score |
| print(f"Generation 0 complete. Best score: {best_gen_score:.4f}, Avg score: {avg_gen_score:.4f}") |
| logger.info(f"Early stopping baseline set to: {self._best_score_so_far:.4f}") |
| |
| self._log_generation(0, combo_population_with_scores) |
|
|
| for t in range(self.iterations): |
| print(f"\n--- Generation {t + 1}/{self.iterations} (Combo Evolution) ---") |
| |
| parent_prompts_for_node = {name: [] for name in initial_config.keys()} |
| for combo, _ in combo_population_with_scores: |
| for node_name, prompt in combo.items(): |
| parent_prompts_for_node[node_name].append(prompt) |
|
|
| children_populations = {} |
| for node_name, prompts in parent_prompts_for_node.items(): |
| children_populations[node_name] = await self._perform_node_evolution(node_name, prompts) |
|
|
| print("Evaluating new child combinations...") |
| child_combinations = self._generate_combinations(children_populations) |
| child_scores = await self._evaluate_combination_list(child_combinations, benchmark, dev_set) |
| child_combos_with_scores = list(zip(child_combinations, child_scores)) |
|
|
| print("Selecting best combinations from parents and children...") |
| combined_population = combo_population_with_scores + child_combos_with_scores |
| |
| sorted_combos = sorted(combined_population, key=lambda x: x[1], reverse=True) |
| combo_population_with_scores = sorted_combos[:self.population_size] |
|
|
| self._log_generation(t + 1, combo_population_with_scores) |
| current_scores = [score for _, score in combo_population_with_scores] |
| best_gen_score = max(current_scores) if current_scores else 0 |
| avg_gen_score = np.mean(current_scores) if current_scores else 0 |
| |
| gen_name = f"Gen_{t + 1}" |
| self.best_combo_scores_per_gen[gen_name] = best_gen_score |
| self.avg_combo_scores_per_gen[gen_name] = avg_gen_score |
| print(f"Generation {t + 1} complete. Best score: {best_gen_score:.4f}, Avg score: {avg_gen_score:.4f}") |
|
|
| if self.enable_early_stopping: |
| if best_gen_score > self._best_score_so_far + 1e-6: |
| self._best_score_so_far = best_gen_score |
| self._generations_without_improvement = 0 |
| logger.info(f"Early stopping: New best combination score found: {self._best_score_so_far:.4f}. Patience counter reset.") |
| else: |
| self._generations_without_improvement += 1 |
| logger.info(f"Early stopping: No improvement for {self._generations_without_improvement} generation(s). Patience: {self.early_stopping_patience}.") |
|
|
| if self._generations_without_improvement >= self.early_stopping_patience: |
| logger.warning(f"\n--- EARLY STOPPING TRIGGERED at generation {t + 1} ---") |
| break |
| |
| print("\n--- Evolution complete ---") |
| if self.full_evaluation: |
| best_config = { |
| name: self.node_populations[name][np.argmax(self.node_scores[name])] |
| for name in self.node_populations.keys() if self.node_populations.get(name) and self.node_scores.get(name) |
| } |
| else: |
| best_config, _ = max(combo_population_with_scores, key=lambda x: x[1]) if combo_population_with_scores else ({}, 0) |
|
|
| self._log_optimization_summary("GA", best_config) |
| self.apply_cfg(best_config) |
| logger.info("Optimization finished! The best configuration has been applied to the program.") |
| |
| return best_config, self.best_combo_scores_per_gen, self.avg_scores_per_gen |
|
|
|
|
| class DEOptimizer(EvopromptOptimizer): |
| """ |
| Differential Evolution optimizer for prompt evolution. |
| |
| This optimizer uses differential evolution strategy for prompt optimization, |
| including mutation, crossover, and selection operations based on DE principles. |
| """ |
| |
| def __init__(self, *args, **kwargs): |
| """ |
| Initialize the DE optimizer. |
| |
| Args: |
| *args: Arguments passed to parent class |
| **kwargs: Keyword arguments passed to parent class |
| """ |
| super().__init__(*args, **kwargs) |
| |
| |
| self.de_agent = CustomizeAgent( |
| name="DE_Agent", |
| description="Generates a new trial prompt using the Differential Evolution strategy.", |
| prompt="""Please follow the instructions step-by-step to generate a better prompt using Differential Evolution strategy. |
| |
| 1. Identify the different parts between these two donor prompts: |
| Donor Prompt 1: {donor1} |
| Donor Prompt 2: {donor2} |
| |
| 2. Randomly mutate the different parts identified above. |
| |
| 3. Combine the mutated parts with the best prompt, selectively replacing its content: |
| Best Prompt: {best_prompt} |
| |
| 4. Crossover the result from Step 3 with the current prompt to generate the final evolved prompt. Strictly preserve the original XML tags structure: |
| Current Prompt: {current_prompt} |
| |
| Please provide the final evolved prompt in the following format: |
| |
| ## evolved_prompt |
| [Your evolved prompt here]""", |
| llm_config=self.llm_config, |
| inputs=[ |
| {"name": "current_prompt", "type": "string", "description": "The base prompt to be mutated, p_i."}, |
| {"name": "donor1", "type": "string", "description": "The first donor prompt, p_r1."}, |
| {"name": "donor2", "type": "string", "description": "The second donor prompt, p_r2."}, |
| {"name": "best_prompt", "type": "string", "description": "The best prompt found so far in the population, p_best."}, |
| ], |
| outputs=[ |
| {"name": "evolved_prompt", "type": "string", "description": "The evolved prompt with XML tags preserved."} |
| ], |
| parse_mode="title" |
| ) |
|
|
| async def _evolve_and_select_one( |
| self, |
| target_combo_with_score: tuple, |
| full_pop_with_scores: List[tuple], |
| benchmark: BIGBenchHard, |
| dev_set: list |
| ) -> tuple: |
| """ |
| Evolve a single combination using differential evolution and select the better one. |
| |
| Args: |
| target_combo_with_score: The target combination and its score |
| full_pop_with_scores: The full population with scores |
| benchmark: The benchmark for evaluation |
| dev_set: Development set for evaluation |
| |
| Returns: |
| Tuple of the better combination (target or trial) and its score |
| """ |
| target_combo, target_score = target_combo_with_score |
| best_combo, _ = max(full_pop_with_scores, key=lambda x: x[1]) |
| |
| |
| donor_pool = [c for c in full_pop_with_scores if c[0] != target_combo] |
| if len(donor_pool) < 2: |
| donors = random.choices(full_pop_with_scores, k=2) |
| else: |
| donors = random.sample(donor_pool, 2) |
| donor1_combo, _ = donors[0] |
| donor2_combo, _ = donors[1] |
| |
| |
| evolution_tasks = [] |
| node_names = list(target_combo.keys()) |
| for node_name in node_names: |
| task = self._perform_evolution( |
| agent=self.de_agent, |
| inputs={ |
| "current_prompt": target_combo[node_name], |
| "donor1": donor1_combo[node_name], |
| "donor2": donor2_combo[node_name], |
| "best_prompt": best_combo[node_name] |
| } |
| ) |
| evolution_tasks.append(task) |
| |
| |
| evolved_components = await asyncio.gather(*evolution_tasks) |
| trial_combo = {name: comp for name, comp in zip(node_names, evolved_components)} |
| trial_scores = await self._evaluate_combination_list([trial_combo], benchmark, dev_set) |
| trial_score = trial_scores[0] |
| |
| |
| return (trial_combo, trial_score) if trial_score > target_score else (target_combo, target_score) |
|
|
| async def optimize(self, benchmark: BIGBenchHard) -> tuple[Dict[str, str], dict, dict]: |
| self._setup_logging_directory(benchmark) |
| initial_config = self.get_current_cfg() |
| if not initial_config: |
| raise ValueError("Registry is empty.") |
| logger.info("Optimizing with DEOptimizer (Pipelined Combination Evolution).") |
| await self._initialize_node_populations(initial_config) |
| dev_set = benchmark.get_dev_data() |
| if not dev_set: |
| raise ValueError("Benchmark has no development set.") |
| |
| self._best_score_so_far = -float('inf') |
| self._generations_without_improvement = 0 |
| |
| print("--- Step 1: Creating and evaluating initial combination population... ---") |
| initial_combinations = self._generate_combinations(self.node_populations) |
| initial_scores = await self._evaluate_combination_list(initial_combinations, benchmark, dev_set) |
| combo_pop_with_scores = list(zip(initial_combinations, initial_scores)) |
| |
| self._log_generation(0, combo_pop_with_scores) |
| initial_best = max(initial_scores) if initial_scores else 0 |
| initial_avg = np.mean(initial_scores) if initial_scores else 0 |
| self.best_combo_scores_per_gen["Gen_0"] = initial_best |
| self.avg_combo_scores_per_gen["Gen_0"] = initial_avg |
| print(f"Initial population - Best score: {initial_best:.4f}, Avg score: {initial_avg:.4f}") |
|
|
| if initial_scores: |
| self._best_score_so_far = initial_best |
| |
| for t in range(self.iterations): |
| print(f"\n--- Generation {t + 1}/{self.iterations} ---") |
| evolution_pipeline_tasks = [ |
| self._evolve_and_select_one(combo_with_score, combo_pop_with_scores, benchmark, dev_set) |
| for combo_with_score in combo_pop_with_scores |
| ] |
| pbar = aio_tqdm(total=len(evolution_pipeline_tasks), desc=f"Pipelined Evolution Gen {t+1}") |
| next_gen_pop_with_scores = [] |
| for future in asyncio.as_completed(evolution_pipeline_tasks): |
| result = await future |
| next_gen_pop_with_scores.append(result) |
| pbar.update(1) |
| pbar.close() |
| combo_pop_with_scores = next_gen_pop_with_scores |
| |
| self._log_generation(t + 1, combo_pop_with_scores) |
| current_scores = [score for _, score in combo_pop_with_scores] |
| best_gen_score = max(current_scores) if current_scores else 0 |
| avg_gen_score = np.mean(current_scores) if current_scores else 0 |
| |
| gen_name = f"Gen_{t + 1}" |
| self.best_combo_scores_per_gen[gen_name] = best_gen_score |
| self.avg_combo_scores_per_gen[gen_name] = avg_gen_score |
| print(f"Generation {t + 1} complete. Best score: {best_gen_score:.4f}, Avg score: {avg_gen_score:.4f}") |
|
|
| if self.enable_early_stopping: |
| if best_gen_score > self._best_score_so_far + 1e-6: |
| self._best_score_so_far = best_gen_score |
| self._generations_without_improvement = 0 |
| logger.info(f"Early stopping: New best score found: {self._best_score_so_far:.4f}. Patience counter reset.") |
| else: |
| self._generations_without_improvement += 1 |
| logger.info(f"Early stopping: No improvement for {self._generations_without_improvement} generation(s). Patience: {self.early_stopping_patience}.") |
|
|
| if self._generations_without_improvement >= self.early_stopping_patience: |
| logger.warning(f"\n--- EARLY STOPPING TRIGGERED at generation {t + 1} ---") |
| logger.warning(f"No improvement in best score for {self.early_stopping_patience} consecutive generations.") |
| break |
|
|
| print("\n--- Combination-Level Evolution complete ---") |
| best_combination, best_score = max(combo_pop_with_scores, key=lambda x: x[1]) if combo_pop_with_scores else ({}, 0) |
| logger.info(f"Optimization finished! Best combination found with score {best_score:.4f}.") |
| |
| self._log_optimization_summary("DE", best_combination) |
| self.apply_cfg(best_combination) |
| return best_combination, self.best_combo_scores_per_gen, self.avg_combo_scores_per_gen |