| | import json |
| | from typing import Optional, List |
| | from pydantic import Field, PositiveInt |
| |
|
| | import time |
| | from ..core.logging import logger |
| | from ..core.module import BaseModule |
| | |
| | from ..core.message import Message, MessageType |
| | from ..models.base_model import BaseLLM |
| | from ..agents.agent import Agent |
| | from ..agents.task_planner import TaskPlanner |
| | from ..agents.agent_generator import AgentGenerator |
| | from ..agents.workflow_reviewer import WorkFlowReviewer |
| | from ..actions.task_planning import TaskPlanningOutput |
| | from ..actions.agent_generation import AgentGenerationOutput |
| | from ..workflow.workflow_graph import WorkFlowGraph, WorkFlowNode, WorkFlowEdge |
| | from ..tools.tool import Toolkit |
| |
|
| | class WorkFlowGenerator(BaseModule): |
| | """ |
| | Automated workflow generation system based on high-level goals. |
| | |
| | The WorkFlowGenerator is responsible for creating complete workflow graphs |
| | from high-level goals or task descriptions. It breaks down the goal into |
| | subtasks, creates the necessary dependency connections between tasks, |
| | and assigns or generates appropriate agents for each task. |
| | |
| | Attributes: |
| | llm: Language model used for generation and planning |
| | task_planner: Component responsible for breaking down goals into subtasks |
| | agent_generator: Component responsible for agent assignment or creation |
| | workflow_reviewer: Component for reviewing and improving workflows |
| | num_turns: Number of refinement iterations for the workflow |
| | """ |
| | llm: Optional[BaseLLM] = None |
| | task_planner: Optional[TaskPlanner] = Field(default=None, description="Responsible for breaking down the high-level task into manageable sub-tasks.") |
| | agent_generator: Optional[AgentGenerator] = Field(default=None, description="Assigns or generates the appropriate agent(s) to handle each sub-task.") |
| | workflow_reviewer: Optional[WorkFlowReviewer] = Field(default=None, description="Provides feedback and reflections to improve the generated workflow.") |
| | num_turns: Optional[PositiveInt] = Field(default=0, description="Specifies the number of refinement iterations for the generated workflow.") |
| | tools: Optional[List[Toolkit]] = Field(default=None, description="A list of tools that can be used in the workflow.") |
| | |
| | def init_module(self): |
| | if self.task_planner is None: |
| | if self.llm is None: |
| | raise ValueError("Must provide `llm` when `task_planner` is None") |
| | self.task_planner = TaskPlanner(llm=self.llm) |
| | |
| | if self.agent_generator is None: |
| | if self.llm is None: |
| | raise ValueError("Must provide `llm` when `agent_generator` is None") |
| | self.agent_generator = AgentGenerator(llm=self.llm, tools=self.tools) |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | def get_tool_info(self): |
| | self.tool_info =[ |
| | { |
| | tool.name: [ |
| | s["function"]["description"] for s in tool.get_tool_schemas() |
| | ], |
| | } |
| | for tool in self.tools |
| | ] |
| |
|
| | def _execute_with_retry(self, operation_name: str, operation, retries_left: int = 1, **kwargs): |
| | """Helper method to execute operations with retry logic. |
| | |
| | Args: |
| | operation_name: Name of the operation for logging |
| | operation: Callable that performs the operation |
| | retries_left: Number of retry attempts remaining |
| | **kwargs: Additional arguments to pass to the operation |
| | |
| | Returns: |
| | Tuple of (operation_result, number_of_retries_used) |
| | |
| | Raises: |
| | ValueError: If operation fails after all retries are exhausted |
| | """ |
| | cur_retries = 0 |
| |
|
| | while cur_retries <= retries_left: |
| | try: |
| | logger.info(f"{operation_name} (attempt {cur_retries + 1}/{retries_left + 1}) ...") |
| | result = operation(**kwargs) |
| | return result, cur_retries |
| | except Exception as e: |
| | if cur_retries == retries_left: |
| | raise ValueError(f"Failed to {operation_name} after {cur_retries + 1} attempts.\nError: {e}") |
| | sleep_time = 2 ** cur_retries |
| | logger.error(f"Failed to {operation_name} in {cur_retries + 1} attempts. Retry after {sleep_time} seconds.\nError: {e}") |
| | time.sleep(sleep_time) |
| | cur_retries += 1 |
| |
|
| | def generate_workflow(self, goal: str, existing_agents: Optional[List[Agent]] = None, retry: int = 1, **kwargs) -> WorkFlowGraph: |
| | |
| | if not goal or len(goal.strip()) < 10: |
| | raise ValueError("Goal must be at least 10 characters and descriptive") |
| |
|
| | plan_history, plan_suggestion = "", "" |
| |
|
| | |
| | cur_retries = 0 |
| | plan, added_retries = self._execute_with_retry( |
| | operation_name="Generating a workflow plan", |
| | operation=self.generate_plan, |
| | retries_left=retry, |
| | goal=goal, |
| | history=plan_history, |
| | suggestion=plan_suggestion |
| | ) |
| | cur_retries += added_retries |
| |
|
| | |
| | workflow, added_retries = self._execute_with_retry( |
| | operation_name="Building workflow from plan", |
| | operation=self.build_workflow_from_plan, |
| | retries_left=retry - cur_retries, |
| | goal=goal, |
| | plan=plan |
| | ) |
| | cur_retries += added_retries |
| |
|
| | |
| | logger.info("Validating initial workflow structure...") |
| | workflow._validate_workflow_structure() |
| | logger.info(f"Successfully generate the following workflow:\n{workflow.get_workflow_description()}") |
| |
|
| | |
| | logger.info("Generating agents for the workflow ...") |
| | workflow, added_retries = self._execute_with_retry( |
| | operation_name="Generating agents for the workflow", |
| | operation=self.generate_agents, |
| | retries_left=retry - cur_retries, |
| | goal=goal, |
| | workflow=workflow, |
| | existing_agents=existing_agents |
| | ) |
| |
|
| | |
| | logger.info("Validating workflow after agent generation...") |
| | workflow._validate_workflow_structure() |
| | |
| | for node in workflow.nodes: |
| | if not node.agents: |
| | raise ValueError(f"Node {node.name} has no agents assigned after agent generation") |
| |
|
| | return workflow |
| |
|
| | def generate_plan(self, goal: str, history: Optional[str] = None, suggestion: Optional[str] = None) -> TaskPlanningOutput: |
| | history = "" if history is None else history |
| | suggestion = "" if suggestion is None else suggestion |
| | task_planner: TaskPlanner = self.task_planner |
| | task_planning_action_data = {"goal": goal, "history": history, "suggestion": suggestion} |
| | task_planning_action_name = task_planner.task_planning_action_name |
| | message: Message = task_planner.execute( |
| | action_name=task_planning_action_name, |
| | action_input_data=task_planning_action_data, |
| | return_msg_type=MessageType.REQUEST |
| | ) |
| | return message.content |
| | |
| | def generate_agents( |
| | self, |
| | goal: str, |
| | workflow: WorkFlowGraph, |
| | existing_agents: Optional[List[Agent]] = None, |
| | |
| | |
| | ) -> WorkFlowGraph: |
| | |
| | agent_generator: AgentGenerator = self.agent_generator |
| | workflow_desc = workflow.get_workflow_description() |
| | agent_generation_action_name = agent_generator.agent_generation_action_name |
| | for subtask in workflow.nodes: |
| | subtask_fields = ["name", "description", "reason", "inputs", "outputs"] |
| | subtask_data = {key: value for key, value in subtask.to_dict(ignore=["class_name"]).items() if key in subtask_fields} |
| | subtask_desc = json.dumps(subtask_data, indent=4) |
| | agent_generation_action_data = {"goal": goal, "workflow": workflow_desc, "task": subtask_desc} |
| | logger.info(f"Generating agents for subtask: {subtask_data['name']}") |
| | agents: AgentGenerationOutput = agent_generator.execute( |
| | action_name=agent_generation_action_name, |
| | action_input_data=agent_generation_action_data, |
| | return_msg_type=MessageType.RESPONSE |
| | ).content |
| | |
| | generated_agents = [] |
| | for agent in agents.generated_agents: |
| | agent_dict = agent.to_dict(ignore=["class_name"]) |
| | |
| | generated_agents.append(agent_dict) |
| | subtask.set_agents(agents=generated_agents) |
| | return workflow |
| | |
| | |
| | def build_workflow_from_plan(self, goal: str, plan: TaskPlanningOutput) -> WorkFlowGraph: |
| | nodes: List[WorkFlowNode] = plan.sub_tasks |
| | |
| | edges: List[WorkFlowEdge] = [] |
| | for node in nodes: |
| | for another_node in nodes: |
| | if node.name == another_node.name: |
| | continue |
| | node_output_params = [param.name for param in node.outputs] |
| | another_node_input_params = [param.name for param in another_node.inputs] |
| | if any([param in another_node_input_params for param in node_output_params]): |
| | edges.append(WorkFlowEdge(edge_tuple=(node.name, another_node.name))) |
| | workflow = WorkFlowGraph(goal=goal, nodes=nodes, edges=edges) |
| | return workflow |
| | |
| |
|