| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| import os |
| import shutil |
| from copy import deepcopy |
| from typing import Any, Iterator, List, Literal, Optional, Tuple, Union |
|
|
| import numpy as np |
| from pydantic import Field, PositiveInt |
| from tqdm import tqdm |
|
|
| from ..agents import Agent, CustomizeAgent |
| from ..benchmark.benchmark import Benchmark, CodingBenchmark |
| from ..core.callbacks import suppress_logger_info |
| from ..core.logging import logger |
| from ..core.module import BaseModule |
| from ..evaluators import Evaluator |
| from ..models.base_model import BaseLLM |
| from ..prompts import PromptTemplate |
| from ..workflow.workflow_graph import WorkFlowGraph, WorkFlowNode |
|
|
| |
| log_folder_exists = os.path.exists("./logs") |
|
|
| import textgrad as tg |
| from textgrad import Variable, EngineLM |
| from textgrad import logger as tg_logger |
| from textgrad import sh as tg_file_handler |
| from textgrad.autograd import StringBasedFunction |
| from textgrad.loss import MultiFieldEvaluation, TextLoss |
| from textgrad.optimizer import TextualGradientDescent |
|
|
| from ..prompts.optimizers.textgrad_optimizer import ( |
| CODE_LOSS_PROMPT, |
| GENERAL_LOSS_PROMPT, |
| NO_ANSWER_LOSS_PROMPT, |
| OPTIMIZER_CONSTRAINTS, |
| OPTIMIZER_SYSTEM_PROMPT, |
| PERSONAL_FINANCE_ADVISOR_EXAMPLE, |
| FITNESS_COACH_EXAMPLE, |
| CODE_REVIEW_EXAMPLE, |
| ) |
|
|
| tg_logger.removeHandler(tg_file_handler) |
| |
| if not log_folder_exists and os.path.exists("./logs"): |
| shutil.rmtree("./logs") |
|
|
|
|
| class TextGradEngine(EngineLM): |
| def __init__(self, llm: BaseLLM): |
| self.llm = llm |
|
|
| def generate(self, prompt: str, system_prompt: str = None, **kwargs): |
| with suppress_logger_info(): |
| response = self.llm.generate(prompt, system_prompt=system_prompt, **kwargs) |
| return response.content |
|
|
| def __call__(self, prompt: str, **kwargs): |
| return self.generate(prompt, **kwargs) |
|
|
|
|
| class CustomAgentCall: |
| """A custom agent call with textgrad.Variable inputs and output.""" |
| def __init__(self, agent: Agent): |
| self.agent = agent |
| self.last_outputs: dict[str, str] = dict() |
| |
| def __call__( |
| self, |
| instruction: Variable, |
| system_prompt: Variable, |
| **inputs: Variable |
| ) -> Variable: |
|
|
| action = self.agent.actions[0] |
| input_names = action.inputs_format.get_attrs() |
|
|
| agent_inputs = {} |
|
|
| for key, input_variable in inputs.items(): |
| if key in input_names: |
| agent_inputs[key] = input_variable.value |
| else: |
| parsed_inputs: dict[str, str] = {k:v for k,v in input_variable.parsed_outputs.items() if k in input_names} |
| agent_inputs.update(parsed_inputs) |
|
|
| with suppress_logger_info(): |
| outputs = self.agent.execute(action_name=action.name, action_input_data=agent_inputs).content |
|
|
| self.last_outputs = outputs.to_dict() |
|
|
| return outputs.content |
|
|
|
|
| class TextGradAgent: |
| """An agent that takes textgrad.Variable inputs and returns a textgrad.Variable response. |
| This class is used to replace EvoAgentX Agent in WorkFlowGraph to allow TextGrad optimization. |
| """ |
| def __init__( |
| self, |
| agent: Agent, |
| optimize_mode: Literal["all", "system_prompt", "instruction"] = "all" |
| ): |
| self.name = agent.name |
|
|
| require_grad = { |
| "all": {"system_prompt": True, "instruction": True}, |
| "system_prompt": {"system_prompt": True, "instruction": False}, |
| "instruction": {"system_prompt": False, "instruction": True} |
| } |
|
|
| system_prompt_require_grad = require_grad[optimize_mode]["system_prompt"] |
| instruction_require_grad = require_grad[optimize_mode]["instruction"] |
|
|
| self.system_prompt = Variable( |
| agent.system_prompt, |
| requires_grad=system_prompt_require_grad, |
| role_description=f"{self.name}'s system prompt" |
| ) |
| |
| self.instruction = Variable( |
| agent.actions[0].prompt_template.instruction, |
| requires_grad=instruction_require_grad, |
| role_description=f"{self.name}'s instruction prompt" |
| ) |
|
|
| self._agent_call = CustomAgentCall(agent) |
| self.forward = StringBasedFunction(self._agent_call, agent.description) |
| self.output_description = " and ".join(agent.actions[0].outputs_format.get_attr_descriptions().values()) |
| self.last_output = None |
|
|
| def __call__(self, inputs: dict[str, Variable]) -> Variable: |
| """Given textgrad.Variable inputs, generates a textgrad.Variable output.""" |
|
|
| forward_inputs: dict[str, Variable] = { |
| "instruction": self.instruction, |
| "system_prompt": self.system_prompt, |
| **inputs |
| } |
|
|
| output_variable = self.forward(forward_inputs, self.output_description) |
| output_variable.parsed_outputs = self._agent_call.last_outputs |
| self.last_output = output_variable |
| return output_variable |
|
|
|
|
|
|
| class TextGradOptimizer(BaseModule): |
| """Uses TextGrad to optimize agents' system prompts and instructions in a multi-agent workflow. |
| For more information on TextGrad, see https://github.com/zou-group/textgrad. |
| """ |
| graph: WorkFlowGraph = Field(description="The workflow to optimize.") |
| optimize_mode: Literal["all", "system_prompt", "instruction"] = Field(default="all", description="The mode to optimize the workflow. 'all' optimizes both system prompts and instructions, 'system_prompt' only optimizes system prompts, and 'instruction' only optimizes instructions.") |
| executor_llm: BaseLLM = Field(default=None, description="The LLM to use for execution.") |
| optimizer_llm: BaseLLM = Field(default=None, description="The LLM to use for optimization.") |
| batch_size: PositiveInt = Field(default=1, description="The batch size for optimization.") |
| max_steps: PositiveInt = Field(default=10, description="The maximum number of optimization steps.") |
| evaluator: Evaluator = Field(default=None, description="The evaluator to perform evaluation during optimization.") |
| eval_every_n_steps: Optional[PositiveInt] = Field(default=None, description="Evaluate the workflow every `eval_every_n_steps` steps.") |
| eval_rounds: PositiveInt = Field(default=1, description="The number of times to evaluate the performance.") |
| eval_config: dict = Field(default={}, description="The configuration for evaluation. The keys are the arguments of `TextGradOptimizer.evaluate`.") |
| save_interval: Optional[PositiveInt] = Field(default=None, description="Save the workflow every `save_interval` steps.") |
| save_path: str = Field(default="./", description="The path to save the optimized workflow.") |
| rollback: bool = Field(default=True, description="Whether to rollback to the best graph after each evaluation during optimization.") |
| constraints: List[str] = Field(default=[], description="The constraints for optimization. e.g. ['They system prompt must not exceed 100 words.']") |
|
|
|
|
| def init_module(self, **kwargs): |
| self._validate_graph_compatibility(self.graph) |
| self._snapshot: List[dict] = [] |
| self.output_lookup = self._create_output_lookup() |
| |
|
|
| def _init_textgrad(self, dataset: Benchmark, use_answers: bool = True): |
| |
| def disable_short_variable_value(self, n_words_offset: int = 10): |
| return self.value |
| Variable.get_short_value = disable_short_variable_value |
| |
| |
| self.optimizer_engine = TextGradEngine(self.optimizer_llm) |
|
|
| |
| if use_answers: |
| if isinstance(dataset, CodingBenchmark): |
| loss_prompt = CODE_LOSS_PROMPT |
| role_descriptions = ["code snippet to evaluate", "the task, the test result of the code snippet, and the correct code"] |
| else: |
| loss_prompt = GENERAL_LOSS_PROMPT |
| role_descriptions = ["response to evaluate", "correct answer"] |
|
|
| evaluation_instruction = Variable(loss_prompt, requires_grad=False, role_description="evaluation instruction") |
| self.loss_fn = MultiFieldEvaluation(evaluation_instruction, role_descriptions, self.optimizer_engine) |
| else: |
| loss_prompt = NO_ANSWER_LOSS_PROMPT |
| evaluation_instruction = Variable(loss_prompt, requires_grad=False, role_description="evaluation instruction") |
| self.loss_fn = TextLoss(evaluation_instruction, self.optimizer_engine) |
|
|
| |
| self._create_textgrad_agents() |
| |
| |
| if self.optimize_mode == "all": |
| optimize_variables = self._get_all_system_prompts() + self._get_all_instructions() |
| elif self.optimize_mode == "system_prompt": |
| optimize_variables = self._get_all_system_prompts() |
| elif self.optimize_mode == "instruction": |
| optimize_variables = self._get_all_instructions() |
| else: |
| raise ValueError("Unsupported `optimize_mode`, should be one of 'all', 'system_prompt', 'instruction'.") |
|
|
| OPTIMIZER_CONSTRAINTS.extend(self.constraints) |
| |
| self.textgrad_optimizer = TextualGradientDescent( |
| parameters=optimize_variables, |
| engine=self.optimizer_engine, |
| constraints=OPTIMIZER_CONSTRAINTS, |
| optimizer_system_prompt=OPTIMIZER_SYSTEM_PROMPT, |
| in_context_examples=[PERSONAL_FINANCE_ADVISOR_EXAMPLE, FITNESS_COACH_EXAMPLE, CODE_REVIEW_EXAMPLE] |
| ) |
|
|
|
|
| def optimize(self, dataset: Benchmark, use_answers: bool = True, seed: Optional[int] = None) -> None: |
| """Optimizes self.graph using `dataset`. |
| |
| Args: |
| dataset (Benchmark): The dataset to use for optimization. |
| use_answers (bool): Whether to use the answers (labels) in the training set for optimization. |
| If False, `dataset`'s training set does not need to have answers. |
| If `eval_every_n_steps` is set to None, we can optimize the workflow without any labeled data. |
| seed (Optional[int]): The random seed to use for shuffling the data. |
| """ |
| self._init_textgrad(dataset, use_answers) |
| |
| def iterator() -> Iterator[Tuple[List[dict[str, str]], Optional[List[Union[str, dict[str, str]]]]]]: |
| epoch = 0 |
| while True: |
| |
| effective_seed = seed + epoch if seed is not None else None |
| train_data = dataset.get_train_data(sample_k=len(dataset._train_data), seed=effective_seed) |
| for i in range(0, len(train_data), self.batch_size): |
| batch = train_data[i:i + self.batch_size] |
| inputs = [self.evaluator.collate_func(x) for x in batch] |
| if use_answers: |
| labels = dataset.get_labels(batch) |
| else: |
| labels = None |
| yield inputs, labels |
| epoch += 1 |
|
|
| data_iterator = iterator() |
|
|
| for step in tqdm(range(self.max_steps)): |
| inputs, labels = next(data_iterator) |
| self.step(inputs, labels, dataset, use_answers) |
|
|
| if self.eval_every_n_steps is not None and (step + 1) % self.eval_every_n_steps == 0: |
| logger.info(f"Evaluating the workflow at step {step+1} ...") |
| with suppress_logger_info(): |
| metrics = self.evaluate(dataset, **self.eval_config) |
| self.log_snapshot(self.graph, metrics) |
| logger.info(f"Step {step+1} metrics: {metrics}") |
|
|
| |
| if self.rollback: |
| if len(self._snapshot) == 1: |
| best_snapshot = self._snapshot[-1] |
| best_average_score = np.mean(list(metrics.values())) |
| else: |
| current_average_score = np.mean(list(metrics.values())) |
| |
| if current_average_score >= best_average_score: |
| |
| best_snapshot = self._snapshot[-1] |
| best_average_score = current_average_score |
| else: |
| |
| logger.info(f"Metrics are worse than the best snapshot which has {best_snapshot['metrics']}. Rolling back to the best snapshot.") |
| best_graph = WorkFlowGraph.from_dict(best_snapshot["graph"]) |
| self.graph = best_graph |
| self._create_textgrad_agents() |
|
|
| if self.save_interval is not None and (step + 1) % self.save_interval == 0: |
| logger.info(f"Saving the workflow at step {step+1} ...") |
| self.save(os.path.join(self.save_path, f"{dataset.name}_textgrad_step_{step+1}.json")) |
|
|
| logger.info(f"Reached the maximum number of steps {self.max_steps}. Optimization has finished.") |
| self.save(os.path.join(self.save_path, f"{dataset.name}_textgrad_final.json")) |
|
|
| |
| if len(self._snapshot) > 0: |
| best_graph = self._select_graph_with_highest_score() |
| self.save(os.path.join(self.save_path, f"{dataset.name}_textgrad_best.json"), graph=best_graph) |
|
|
| |
| def step( |
| self, |
| inputs: list[dict[str, str]], |
| labels: Optional[list[Union[str, dict[str, str]]]], |
| dataset: Benchmark, |
| use_answers: bool = True |
| ) -> None: |
| """Performs one optimization step using a batch of data.""" |
|
|
| losses = [] |
| logger.info("Executing workflow...") |
|
|
| if use_answers: |
| if labels is None: |
| raise ValueError("Labels must be provided if `use_answers` is True.") |
|
|
| for input, label in zip(inputs, labels, strict=True): |
| output = self.forward(input) |
| if isinstance(label, str): |
| label = Variable(label, requires_grad=False, role_description="correct answer for the query") |
| elif isinstance(label, dict): |
| if not isinstance(dataset, CodingBenchmark): |
| raise ValueError("Label must be a string for non-coding benchmarks.") |
| end_node_name = self.graph.find_end_nodes()[0] |
| end_node = self.graph.get_node(end_node_name) |
| output_name = end_node.outputs[0].name |
| code = output.parsed_outputs[output_name] |
| label = self._format_code_label(code, label, dataset) |
| label = Variable(label, requires_grad=False, role_description="the task, the test result, and the correct code") |
| loss = self.loss_fn([output, label]) |
| losses.append(loss) |
| else: |
| for input in inputs: |
| output = self.forward(input) |
| loss = self.loss_fn(output) |
| losses.append(loss) |
|
|
| total_loss = tg.sum(losses) |
| logger.info("Computing gradients...") |
| total_loss.backward(self.optimizer_engine) |
| logger.info("Updating agents...") |
| self.textgrad_optimizer.step() |
| self.textgrad_optimizer.zero_grad() |
| self._update_workflow_graph() |
| logger.info("Agents updated") |
|
|
|
|
| def forward(self, inputs: dict[str, str]) -> Variable: |
| """Returns the final output from the workflow.""" |
| self._visited_nodes = set() |
| end_node = self.graph.find_end_nodes()[0] |
| input_variables = self._initial_inputs_to_variables(inputs) |
| output = self._compute_node(end_node, input_variables) |
| return output |
|
|
|
|
| def evaluate( |
| self, |
| dataset: Benchmark, |
| eval_mode: str = "dev", |
| graph: Optional[WorkFlowGraph] = None, |
| indices: Optional[List[int]] = None, |
| sample_k: Optional[int] = None, |
| **kwargs |
| ) -> dict: |
| """Evaluate the workflow. If `graph` is provided, use the provided graph for evaluation. Otherwise, use the graph in the optimizer. |
| |
| Args: |
| dataset (Benchmark): The dataset to evaluate the workflow on. |
| eval_mode (str): The evaluation mode. Choices: ["test", "dev", "train"]. |
| graph (WorkFlowGraph, optional): The graph to evaluate. If not provided, use the graph in the optimizer. |
| indices (List[int], optional): The indices of the data to evaluate the workflow on. |
| sample_k (int, optional): The number of data to evaluate the workflow on. If provided, a random sample of size `sample_k` will be used. |
| |
| Returns: |
| dict: The metrics of the workflow evaluation. |
| """ |
| if graph is None: |
| graph = self.graph |
|
|
| metrics_list = [] |
| for i in range(self.eval_rounds): |
| eval_info = [ |
| f"[{type(graph).__name__}]", |
| f"Evaluation round {i+1}/{self.eval_rounds}", |
| f"Mode: {eval_mode}" |
| ] |
| if indices is not None: |
| eval_info.append(f"Indices: {len(indices)} samples") |
| if sample_k is not None: |
| eval_info.append(f"Sample size: {sample_k}") |
| logger.info(" | ".join(eval_info)) |
| metrics = self.evaluator.evaluate( |
| graph=graph, |
| benchmark=dataset, |
| eval_mode=eval_mode, |
| indices=indices, |
| sample_k=sample_k, |
| update_agents=True, |
| **kwargs |
| ) |
| metrics_list.append(metrics) |
| avg_metrics = self.evaluator._calculate_average_score(metrics_list) |
| |
| return avg_metrics |
|
|
|
|
| def save(self, path: str, graph: Optional[WorkFlowGraph] = None, ignore: List[str] = []) -> None: |
| """Save the workflow graph containing the optimized prompts to a file. |
| |
| Args: |
| path (str): The path to save the workflow graph. |
| graph (WorkFlowGraph, optional): The graph to save. If not provided, use the graph in the optimizer. |
| ignore (List[str]): The keys to ignore when saving the workflow graph. |
| """ |
| if graph is None: |
| graph = self.graph |
| graph.save_module(path, ignore=ignore) |
|
|
|
|
| def log_snapshot(self, graph: WorkFlowGraph, metrics: dict) -> None: |
| """Log the snapshot of the workflow.""" |
| self._snapshot.append( |
| { |
| "index": len(self._snapshot), |
| "graph": deepcopy(graph.get_config()), |
| "metrics": metrics, |
| } |
| ) |
|
|
|
|
| def restore_best_graph(self) -> None: |
| """Restore the best graph from the snapshot and set it to `self.graph`.""" |
| if len(self._snapshot) == 0: |
| logger.info("No snapshot found. No graph to restore.") |
| return |
|
|
| best_graph, best_metrics = self._select_graph_with_highest_score(return_metrics=True) |
| self.graph = best_graph |
| logger.info(f"Restored the best graph from snapshot with metrics {best_metrics}") |
|
|
|
|
| def _format_code_label(self, code: str, label:dict[str, str], dataset: CodingBenchmark) -> str: |
| """Formats the label for coding tasks to include the task, the test result, and the correct code. |
| |
| Args: |
| code: The code to evaluate. |
| label: A dictionary with keys "task_id", "test", "entry_point", and "canonical_solution". |
| dataset: A CodingBenchmark instance with `check_solution` method. |
| |
| Returns: |
| The formatted label which includes the task, the test result, and the correct code. |
| """ |
|
|
| task_id = label["task_id"] |
| prompt = dataset.get_example_by_id(task_id)["prompt"] |
| if 'test' not in label.keys(): |
| label['test'] = label['tests'] |
| test = label["test"] |
| entry_point = label["entry_point"] |
|
|
| state, message = dataset.check_solution( |
| task_id=task_id, |
| solution=prompt + "\n" + code, |
| test=test, |
| entry_point=entry_point |
| ) |
|
|
| if state != dataset.SUCCESS: |
| message = message.replace("Solution", "Failed Code") |
|
|
| formatted_label = f"## Task:\n{prompt}\n\n## Result on test:\n{message}\n\n## Correct Solution:\n{label['canonical_solution']}" |
| return formatted_label |
|
|
|
|
| def _initial_inputs_to_variables(self, initial_inputs: dict[str, str]) -> dict[str, Variable]: |
| """Converts inputs to the initial nodes to textgrad variables.""" |
| variables = {} |
| initial_nodes = self.graph.find_initial_nodes() |
| for initial_node in initial_nodes: |
| for key, value in initial_inputs.items(): |
| for input in self.graph.get_node(initial_node).inputs: |
| if input.name == key: |
| initial_input_variable = Variable( |
| value, |
| requires_grad=False, |
| role_description=input.description, |
| ) |
| variables[key] = initial_input_variable |
| if len(variables) == len(initial_inputs): |
| return variables |
| missing_inputs = set(initial_inputs.keys()) - set(variables.keys()) |
| raise ValueError(f"Initial inputs do not match the inputs of the initial nodes. Missing inputs: {missing_inputs}") |
|
|
|
|
|
|
| def _compute_node(self, node: Union[str, WorkFlowNode], initial_inputs: dict[str, Variable]) -> Variable: |
| """Computes the output of a node in the workflow graph by recursively computing the required inputs. |
| |
| Args: |
| node: The node to compute the output of. |
| initial_inputs: The initial inputs to the workflow that are not from any node in the workflow (e.g., user query). |
| |
| Returns: |
| The output of the node as a textgrad.Variable. |
| """ |
| if isinstance(node, str): |
| node = self.graph.get_node(node) |
|
|
| if node.name in self._visited_nodes: |
| return node.textgrad_agent.last_output |
|
|
| input_variables: dict[str, Variable] = {} |
| input_node_names: set[str] = set() |
|
|
| for input in node.inputs: |
| if input.name in initial_inputs: |
| input_variables[input.name] = initial_inputs[input.name] |
| else: |
| input_node_names.add(self.output_lookup[input.name]) |
| |
| |
| for node_name in input_node_names: |
| input_variables[node_name] = self._compute_node(node_name, initial_inputs) |
|
|
| output_variable = node.textgrad_agent(input_variables) |
| self._visited_nodes.add(node.name) |
| return output_variable |
| |
| |
| def _create_textgrad_agent(self, node: Union[str, WorkFlowNode]) -> TextGradAgent: |
| """Creates a textgrad agent for a given node in a WorkFlowGraph.""" |
| if isinstance(node, str): |
| node = self.graph.get_node(node) |
|
|
| if isinstance(node.agents[0], dict): |
| agent_llm = node.agents[0].get("llm") |
| agent_llm_config = node.agents[0].get("llm_config") |
| if agent_llm is None and agent_llm_config is None: |
| node.agents[0]["llm"] = self.executor_llm |
| |
| |
| agent: Union[CustomizeAgent, Agent] = CustomizeAgent.from_dict(node.agents[0]) |
| else: |
| raise ValueError(f"Unsupported agent type {type(node.agents[0])}. Expected 'dict'.") |
| |
| textgrad_agent = TextGradAgent(agent, self.optimize_mode) |
| return textgrad_agent |
|
|
|
|
| def _create_textgrad_agents(self): |
| """Creates textgrad agents for all nodes in the workflow graph.""" |
| for node in self.graph.nodes: |
| node.textgrad_agent = self._create_textgrad_agent(node) |
|
|
|
|
| def _update_agent_prompts(self, agent_dict: dict[str, Any], system_prompt: str, instruction: str) -> dict[str, Any]: |
| agent_dict["system_prompt"] = system_prompt |
| if "actions" in agent_dict: |
| |
| agent_dict["actions"][0]["prompt_template"] = self._update_agent_instructions( |
| agent_dict["actions"][0]["prompt_template"], |
| instruction |
| ) |
| else: |
| |
| agent_dict["prompt_template"] = self._update_agent_instructions( |
| agent_dict["prompt_template"], |
| instruction |
| ) |
| return agent_dict |
|
|
|
|
| def _update_agent_instructions( |
| self, |
| prompt_template: Union[PromptTemplate, dict[str, str]], |
| instruction: str |
| ) -> Union[PromptTemplate, dict[str, str]]: |
|
|
| if isinstance(prompt_template, PromptTemplate): |
| prompt_template.set_instruction(instruction) |
| elif isinstance(prompt_template, dict): |
| prompt_template["instruction"] = instruction |
| else: |
| raise ValueError(f"Unsupported prompt template type {type(prompt_template)}. Expected 'PromptTemplate' or 'dict'.") |
| return prompt_template |
|
|
|
|
| def _update_workflow_graph(self): |
| """Updates the workflow graph with the latest prompts from the textgrad optimization.""" |
| for node in self.graph.nodes: |
| if isinstance(node.agents[0], dict): |
| node.agents[0] = self._update_agent_prompts( |
| node.agents[0], |
| node.textgrad_agent.system_prompt.value, |
| node.textgrad_agent.instruction.value |
| ) |
| else: |
| raise ValueError(f"Unsupported agent type {type(node.agents[0])}. Expected 'dict'.") |
|
|
|
|
| def _select_graph_with_highest_score(self, return_metrics: bool = False) -> Union[WorkFlowGraph, tuple[WorkFlowGraph, Optional[dict]]]: |
| """Select the graph in `self._snapshot` with the highest score.""" |
| if len(self._snapshot) == 0: |
| if return_metrics: |
| return self.graph, None |
| return self.graph |
| |
| snapshot_scores = [np.mean(list(snapshot["metrics"].values())) for snapshot in self._snapshot] |
| best_index = np.argmax(snapshot_scores) |
|
|
| graph = WorkFlowGraph.from_dict(self._snapshot[best_index]["graph"]) |
|
|
| if return_metrics: |
| return graph, self._snapshot[best_index]["metrics"] |
| return graph |
|
|
|
|
| def _get_all_system_prompts(self) -> List[Variable]: |
| """Gets all system prompts from the textgrad agents.""" |
| system_prompts = [] |
| for node in self.graph.nodes: |
| system_prompts.append(node.textgrad_agent.system_prompt) |
| return system_prompts |
|
|
| |
| def _get_all_instructions(self) -> List[Variable]: |
| """Gets all prompt templates from the textgrad agents.""" |
| instructions = [] |
| for node in self.graph.nodes: |
| instructions.append(node.textgrad_agent.instruction) |
| return instructions |
|
|
|
|
| def _create_output_lookup(self) -> dict[str, str]: |
| """Creates a lookup table for output names to node names.""" |
| output_name_to_node_name = {} |
| for node in self.graph.nodes: |
| for output in node.outputs: |
| output_name_to_node_name[output.name] = node.name |
| return output_name_to_node_name |
|
|
|
|
| def _validate_graph_compatibility(self, graph: WorkFlowGraph) -> None: |
| """Checks if the graph is compatible with the textgrad optimizer.""" |
| for node in graph.nodes: |
| if len(node.agents) > 1: |
| raise ValueError("TextGrad optimizer only supports workflows where every node only has a single agent.") |
| else: |
| agent = node.agents[0] |
| if not isinstance(agent, dict): |
| raise ValueError(f"Unsupported agent type {type(agent)}. Expected 'dict'.") |
| else: |
| if "actions" in agent: |
| |
| |
| non_ContextExtraction_actions = [ |
| action for action in agent["actions"] if action["class_name"] != "ContextExtraction" |
| ] |
| if len(non_ContextExtraction_actions) > 1: |
| raise ValueError(f"TextGrad optimizer only supports workflows where every agent only has a single action. {agent['name']} has {len(non_ContextExtraction_actions)} actions.") |
| if "prompt_template" not in non_ContextExtraction_actions[0]: |
| raise ValueError(f"Please provide a PromptTemplate for {agent['name']}.") |
| else: |
| |
| if "prompt_template" not in agent: |
| raise ValueError(f"Please provide a PromptTemplate for {agent['name']}.") |
| |
|
|
| |