| | from pydantic import Field |
| | from itertools import chain |
| | from collections import defaultdict |
| | from typing import Union, Optional, Tuple, Dict, List |
| |
|
| | from ..core.module import BaseModule |
| | |
| | from ..core.message import Message, MessageType |
| | from ..models.base_model import BaseLLM, LLMOutputParser |
| | |
| | from ..actions.action import Action |
| | from ..agents.agent_manager import AgentManager |
| | from .action_graph import ActionGraph |
| | from .environment import Environment, TrajectoryState |
| | from .workflow_graph import WorkFlowNode, WorkFlowGraph |
| | from ..prompts.workflow.workflow_manager import ( |
| | DEFAULT_TASK_SCHEDULER, |
| | DEFAULT_ACTION_SCHEDULER, |
| | OUTPUT_EXTRACTION_PROMPT |
| | ) |
| |
|
| |
|
| | class Scheduler(Action): |
| | """ |
| | Base interface for workflow schedulers. |
| | |
| | Provides a common interface for all scheduler types within the workflow |
| | system. Schedulers are responsible for making decisions about what to |
| | execute next in a workflow, whether at the task or action level. |
| | |
| | Inherits from Action to leverage the common action interface and functionality. |
| | """ |
| | pass |
| |
|
| |
|
| | class TaskSchedulerOutput(LLMOutputParser): |
| | |
| | decision: str = Field(description="The decision made by the scheduler, whether to re-execute, iterate or forward a certain task.") |
| | task_name: str = Field(description="The name of the scheduled task.") |
| | reason: str = Field(description="The rationale behind the scheduling decision, explaining why the task was scheduled.") |
| |
|
| | def to_str(self, **kwargs) -> str: |
| | return f"Based on the workflow execution results, the next subtask to be executed is '{self.task_name}' because {self.reason}" |
| | |
| |
|
| | class TaskScheduler(Action): |
| |
|
| | """ |
| | Determines the next task to execute in a workflow. |
| | """ |
| | def __init__(self, **kwargs): |
| | name = kwargs.pop("name", None) if "name" in kwargs else DEFAULT_TASK_SCHEDULER["name"] |
| | description = kwargs.pop("description", None) if "description" in kwargs else DEFAULT_TASK_SCHEDULER["description"] |
| | prompt = kwargs.pop("prompt", None) if "prompt" in kwargs else DEFAULT_TASK_SCHEDULER["prompt"] |
| | super().__init__(name=name, description=description, prompt=prompt, outputs_format=TaskSchedulerOutput, **kwargs) |
| | self.max_num_turns = kwargs.get("max_num_turns", DEFAULT_TASK_SCHEDULER["max_num_turns"]) |
| |
|
| | def get_predecessor_tasks(self, graph: WorkFlowGraph, tasks: List[WorkFlowNode]) -> List[str]: |
| | predecessors = [] |
| | for task in tasks: |
| | candidates = graph.get_node_predecessors(node=task) |
| | for candidate in candidates: |
| | if candidate not in predecessors: |
| | predecessors.append(candidate) |
| | return predecessors |
| | |
| | def _handle_edge_cases(self, candidate_tasks: List[WorkFlowNode]) -> Union[TaskSchedulerOutput, None]: |
| | """ |
| | Handle edge cases for task scheduling: Only one candidate task |
| | |
| | Args: |
| | candidate_tasks (List[WorkFlowNode]): List of candidate tasks to schedule |
| | |
| | Returns: |
| | Either a TaskSchedulerOutput if a direct return is possible, or None if normal processing should continue |
| | """ |
| | |
| | |
| | if len(candidate_tasks) == 1: |
| | task_name = candidate_tasks[0].name |
| | scheduled_task = TaskSchedulerOutput( |
| | decision="forward", |
| | task_name=task_name, |
| | reason = f"Only one candidate task '{task_name}' is available." |
| | ) |
| | return scheduled_task |
| | |
| | |
| | return None |
| | |
| | def _prepare_execution(self, graph: WorkFlowGraph, env: Environment, candidate_tasks: List[WorkFlowNode]) -> Tuple[dict, str]: |
| | """ |
| | Prepares common execution logic for both sync and async execute methods. |
| | This is only called when edge cases have been handled and we need to generate a prompt. |
| | |
| | Args: |
| | graph (WorkFlowGraph): The workflow graph. |
| | env (Environment): The execution environment. |
| | candidate_tasks (List[WorkFlowNode]): List of candidate tasks to schedule |
| | |
| | Returns: |
| | A tuple with prompt_inputs and prompt for LLM processing. |
| | """ |
| |
|
| | |
| | workflow_graph_representation = graph.get_workflow_description() |
| | execution_history = " -> ".join(env.task_execution_history) |
| | |
| | predecessor_tasks = self.get_predecessor_tasks(graph=graph, tasks=candidate_tasks) |
| | execution_outputs = "\n\n".join([str(msg) for msg in env.get_task_messages(tasks=predecessor_tasks)]) |
| | candidate_tasks_info = "\n\n".join([task.get_task_info() for task in candidate_tasks]) |
| | prompt_inputs = { |
| | "workflow_graph_representation": workflow_graph_representation, |
| | "execution_history": execution_history, |
| | "execution_outputs": execution_outputs, |
| | "candidate_tasks": candidate_tasks_info, |
| | "max_num_turns": self.max_num_turns |
| | } |
| | prompt = self.prompt.format(**prompt_inputs) |
| | return prompt_inputs, prompt |
| | |
| | def execute(self, llm: Optional[BaseLLM] = None, graph: WorkFlowGraph = None, env: Environment = None, sys_msg: Optional[str] = None, return_prompt: bool=False, **kwargs) -> Union[TaskSchedulerOutput, Tuple[TaskSchedulerOutput, str]]: |
| | """ |
| | Determine the next executable tasks. |
| | |
| | Args: |
| | llm (Optional[BaseLLM]): Language model to use for generation. |
| | graph (WorkFlowGraph): The workflow graph. |
| | env (Environment): The execution environment. |
| | sys_msg (Optional[str]): Optional system message for the LLM. |
| | return_prompt (bool): Whether to return the prompt along with the output. |
| | |
| | Returns: |
| | Union[TaskSchedulerOutput, Tuple[TaskSchedulerOutput, str]]: The scheduled task and optionally the prompt. |
| | """ |
| | assert graph is not None and env is not None, "must provide 'graph' and 'env' when executing TaskScheduler" |
| |
|
| | |
| | candidate_tasks: List[WorkFlowNode] = graph.next() |
| | if not candidate_tasks: |
| | return None |
| |
|
| | |
| | edge_case_result = self._handle_edge_cases(candidate_tasks) |
| | if edge_case_result is not None: |
| | return (edge_case_result, None) if return_prompt else edge_case_result |
| | |
| | |
| | _, prompt = self._prepare_execution(graph, env, candidate_tasks) |
| | scheduled_task = llm.generate(prompt=prompt, system_message=sys_msg, parser=self.outputs_format) |
| | |
| | if return_prompt: |
| | return scheduled_task, prompt |
| | return scheduled_task |
| | |
| | async def async_execute(self, llm: Optional[BaseLLM] = None, graph: WorkFlowGraph = None, env: Environment = None, sys_msg: Optional[str] = None, return_prompt: bool=False, **kwargs) -> Union[TaskSchedulerOutput, Tuple[TaskSchedulerOutput, str]]: |
| | """ |
| | Asynchronously determine the next executable tasks. |
| | |
| | Args: |
| | llm (Optional[BaseLLM]): Language model to use for generation. |
| | graph (WorkFlowGraph): The workflow graph. |
| | env (Environment): The execution environment. |
| | sys_msg (Optional[str]): Optional system message for the LLM. |
| | return_prompt (bool): Whether to return the prompt along with the output. |
| | |
| | Returns: |
| | Union[TaskSchedulerOutput, Tuple[TaskSchedulerOutput, str]]: The scheduled task and optionally the prompt. |
| | """ |
| | assert graph is not None and env is not None, "must provide 'graph' and 'env' when executing TaskScheduler" |
| |
|
| | |
| | candidate_tasks: List[WorkFlowNode] = graph.next() |
| | if not candidate_tasks: |
| | return None |
| |
|
| | |
| | edge_case_result = self._handle_edge_cases(candidate_tasks) |
| | if edge_case_result is not None: |
| | return (edge_case_result, None) if return_prompt else edge_case_result |
| | |
| | |
| | _, prompt = self._prepare_execution(graph, env, candidate_tasks) |
| | scheduled_task = await llm.async_generate(prompt=prompt, system_message=sys_msg, parser=self.outputs_format) |
| | |
| | if return_prompt: |
| | return scheduled_task, prompt |
| | return scheduled_task |
| |
|
| |
|
| | class NextAction(LLMOutputParser): |
| |
|
| | agent: Optional[str] = Field(default=None, description="The name of the selected agent responsible for executing the next action in the workflow.") |
| | action: Optional[str] = Field(default=None, description="The name of the action that the selected agent will execute to continue progressing the subtask.") |
| | reason: Optional[str] = Field(default=None, description= "The justification for selecting this agent and action, explaining how it contributes to subtask execution based on workflow requirements and execution history.") |
| | action_graph: Optional[ActionGraph] = Field(default=None, description="The predefined action graph to be executed.") |
| |
|
| | def to_str(self, **kwargs) -> str: |
| | if self.agent is not None and self.action is not None: |
| | return f"Based on the tasks' execution results, the next action to be executed is the '{self.action}' action of '{self.agent}' agent." |
| | elif self.action_graph is not None: |
| | return f"The predefined action graph '{type(self.action_graph).__name__}' will be executed." |
| | else: |
| | raise ValueError("must provide either both agent (str) and action (str), or action_graph (ActionGraph).") |
| |
|
| |
|
| | class ActionScheduler(Action): |
| |
|
| | """ |
| | Determines the next action(s) to execute for a given task using an LLM. |
| | """ |
| | def __init__(self, **kwargs): |
| | name = kwargs.pop("name", None) if "name" in kwargs else DEFAULT_ACTION_SCHEDULER["name"] |
| | description = kwargs.pop("description", None) if "description" in kwargs else DEFAULT_ACTION_SCHEDULER["description"] |
| | prompt = kwargs.pop("prompt", None) if "prompt" in kwargs else DEFAULT_ACTION_SCHEDULER["prompt"] |
| | super().__init__(name=name, description=description, prompt=prompt, outputs_format=NextAction, **kwargs) |
| |
|
| | def format_task_input_data(self, data: dict) -> str: |
| | info_list = [] |
| | for key, value in data.items(): |
| | info_list.append("## {}\n{}".format(key, value)) |
| | return "\n\n".join(info_list) |
| | |
| | def check_candidate_action(self, task_name: str, actions: List[str], agent_actions_map: Dict[str, List[str]]): |
| | unknown_actions = [] |
| | merged_actions = set(chain.from_iterable(agent_actions_map.values())) |
| | for action in actions: |
| | if action not in merged_actions: |
| | unknown_actions.append(action) |
| | if unknown_actions: |
| | raise ValueError(f"Unknown actions: {unknown_actions} specified in the `next_actions`. All available actions defined for the task ({task_name}) are {merged_actions}.") |
| | |
| | def get_agent_action_pairs(self, action: str, agent_actions_map: Dict[str, List[str]]) -> List[Tuple[str, str]]: |
| | pairs = [] |
| | for agent, actions in agent_actions_map.items(): |
| | if action in actions: |
| | pairs.append((agent, action)) |
| | return pairs |
| |
|
| | def _prepare_action_execution( |
| | self, |
| | task: WorkFlowNode, |
| | agent_manager: AgentManager, |
| | env: Environment |
| | ) -> Union[Tuple[NextAction, None], Tuple[None, dict, str]]: |
| | """ |
| | Prepares common execution logic for both sync and async execute methods. |
| | |
| | Args: |
| | task (WorkFlowNode): The task for which to schedule an action. |
| | agent_manager (AgentManager): The agent manager providing the agents. |
| | env (Environment): The execution environment. |
| | |
| | Returns: |
| | Either a tuple with a scheduled action and None if a direct return is possible, |
| | or a tuple with None, prompt_inputs, and prompt if LLM processing is needed. |
| | """ |
| | |
| | if task.action_graph is not None: |
| | next_action = NextAction(action_graph=task.action_graph) |
| | return next_action, None |
| | |
| | |
| | task_agent_names = task.get_agents() |
| | if not task_agent_names: |
| | raise ValueError(f"The task '{task.name}' does not provide any agents for execution!") |
| | |
| | task_agents = [agent_manager.get_agent(name) for name in task_agent_names] |
| | task_agent_actions_map = {agent.name: [action.name for action in agent.get_all_actions()] for agent in task_agents} |
| | |
| | next_action = None |
| | candidate_agent_actions = defaultdict(set) |
| |
|
| | |
| | task_execution_messages = env.get_task_messages(task.name) |
| | if task_execution_messages and task_execution_messages[-1].next_actions: |
| | predefined_next_actions = task_execution_messages[-1].next_actions |
| | |
| | self.check_candidate_action(task.name, predefined_next_actions, task_agent_actions_map) |
| | if len(predefined_next_actions) == 1: |
| | predefined_next_action = predefined_next_actions[0] |
| | agent_action_pairs = self.get_agent_action_pairs(predefined_next_action, task_agent_actions_map) |
| | if len(agent_action_pairs) == 1: |
| | next_action = NextAction( |
| | agent=agent_action_pairs[0][0], |
| | action=agent_action_pairs[0][1], |
| | reason=f"Selected because task history indicates a single predefined next action: {predefined_next_action}" |
| | ) |
| | else: |
| | for agent, action in agent_action_pairs: |
| | candidate_agent_actions[agent].add(action) |
| | else: |
| | for predefined_next_action in predefined_next_actions: |
| | agent_action_pairs = self.get_agent_action_pairs(predefined_next_action, task_agent_actions_map) |
| | for agent, action in agent_action_pairs: |
| | candidate_agent_actions[agent].add(action) |
| | |
| | |
| | if not next_action and len(task_agent_names) == 1 and len(task_agent_actions_map[task_agent_names[0]]) == 1: |
| | task_agent_name = task_agent_names[0] |
| | task_action_name = task_agent_actions_map[task_agent_name][0] |
| | next_action = NextAction( |
| | agent=task_agent_name, |
| | action=task_action_name, |
| | reason=f"Only one agent ('{task_agent_name}') is available, and it has only one action ('{task_action_name}'), making it the obvious choice." |
| | ) |
| | |
| | if next_action is not None: |
| | return next_action, None |
| |
|
| | |
| | |
| | candidate_agent_actions = candidate_agent_actions or task_agent_actions_map |
| | agent_actions_info = "\n\n".join( |
| | [ |
| | agent.get_agent_profile(action_names=candidate_agent_actions[agent.name]) \ |
| | for agent in task_agents if agent.name in candidate_agent_actions |
| | ] |
| | ) |
| |
|
| | |
| | task_info = task.get_task_info() |
| | task_input_names = [param.name for param in task.inputs] |
| | task_input_data: dict = env.get_execution_data(task_input_names) |
| | task_input_data_info = self.format_task_input_data(data=task_input_data) |
| | task_execution_history = "\n\n".join([str(msg) for msg in task_execution_messages]) |
| |
|
| | prompt_inputs = { |
| | "task_info": task_info, |
| | "task_inputs": task_input_data_info, |
| | "task_execution_history": task_execution_history, |
| | "agent_action_list": agent_actions_info, |
| | } |
| | prompt = self.prompt.format(**prompt_inputs) |
| | return None, prompt_inputs, prompt |
| | |
| | def execute( |
| | self, |
| | llm: Optional[BaseLLM] = None, |
| | task: WorkFlowNode = None, |
| | agent_manager: AgentManager = None, |
| | env: Environment = None, |
| | sys_msg: Optional[str] = None, |
| | return_prompt: bool=True, |
| | **kwargs |
| | ) -> Union[NextAction, Tuple[NextAction, str]]: |
| | """ |
| | Determine the next actions to take for the given task. |
| | If the last message stored in ``next_actions`` specifies the ``next_actions``, choose an action from these actions to execute. |
| | |
| | Args: |
| | llm (Optional[BaseLLM]): Language model to use for generation. |
| | task (WorkFlowNode): The task for which to schedule an action. |
| | agent_manager (AgentManager): The agent manager providing the agents. |
| | env (Environment): The execution environment. |
| | sys_msg (Optional[str]): Optional system message for the LLM. |
| | return_prompt (bool): Whether to return the prompt along with the output. |
| | |
| | Returns: |
| | Union[NextAction, Tuple[NextAction, str]]: The scheduled action and optionally the prompt. |
| | """ |
| | result = self._prepare_action_execution(task=task, agent_manager=agent_manager, env=env) |
| | if result[0] is not None: |
| | |
| | next_action, _ = result |
| | return (next_action, None) if return_prompt else next_action |
| | |
| | |
| | _, _, prompt = result |
| | next_action = llm.generate(prompt=prompt, system_message=sys_msg, parser=self.outputs_format) |
| | |
| | if return_prompt: |
| | return next_action, prompt |
| | return next_action |
| | |
| | async def async_execute( |
| | self, |
| | llm: Optional[BaseLLM] = None, |
| | task: WorkFlowNode = None, |
| | agent_manager: AgentManager = None, |
| | env: Environment = None, |
| | sys_msg: Optional[str] = None, |
| | return_prompt: bool=True, |
| | **kwargs |
| | ) -> Union[NextAction, Tuple[NextAction, str]]: |
| | """ |
| | Asynchronously determine the next actions to take for the given task. |
| | If the last message stored in ``next_actions`` specifies the ``next_actions``, choose an action from these actions to execute. |
| | |
| | Args: |
| | llm (Optional[BaseLLM]): Language model to use for generation. |
| | task (WorkFlowNode): The task for which to schedule an action. |
| | agent_manager (AgentManager): The agent manager providing the agents. |
| | env (Environment): The execution environment. |
| | sys_msg (Optional[str]): Optional system message for the LLM. |
| | return_prompt (bool): Whether to return the prompt along with the output. |
| | |
| | Returns: |
| | Union[NextAction, Tuple[NextAction, str]]: The scheduled action and optionally the prompt. |
| | """ |
| | result = self._prepare_action_execution(task=task, agent_manager=agent_manager, env=env) |
| | if result[0] is not None: |
| | |
| | next_action, _ = result |
| | return (next_action, None) if return_prompt else next_action |
| | |
| | |
| | _, _, prompt = result |
| | next_action = await llm.async_generate(prompt=prompt, system_message=sys_msg, parser=self.outputs_format) |
| | |
| | if return_prompt: |
| | return next_action, prompt |
| | return next_action |
| |
|
| |
|
| | class WorkFlowManager(BaseModule): |
| | """ |
| | Responsible for the scheduling and decision-making when executing a workflow. |
| | |
| | Attributes: |
| | task_scheduler (TaskScheduler): Determines the next task(s) to execute based on the workflow graph and node states. |
| | action_scheduler (ActionScheduler): Determines the next action(s) to take for the selected task using an LLM. |
| | """ |
| | llm: BaseLLM |
| | action_scheduler: ActionScheduler = Field(default_factory=ActionScheduler) |
| | task_scheduler: TaskScheduler = Field(default_factory=TaskScheduler) |
| |
|
| | def init_module(self): |
| | self._save_ignore_fields = ["llm"] |
| |
|
| | async def schedule_next_task(self, graph: WorkFlowGraph, env: Environment = None, **kwargs) -> WorkFlowNode: |
| | """ |
| | Return the next task to execute asynchronously. |
| | """ |
| | execution_results = await self.task_scheduler.async_execute(llm=self.llm, graph=graph, env=env, return_prompt=True, **kwargs) |
| | if execution_results is None: |
| | return None |
| | scheduled_task, prompt, *other = execution_results |
| | message = Message( |
| | content=scheduled_task, agent=type(self).__name__, action=self.task_scheduler.name, \ |
| | prompt=prompt, msg_type=MessageType.COMMAND, wf_goal=graph.goal |
| | ) |
| | env.update(message=message, state=TrajectoryState.COMPLETED) |
| | task: WorkFlowNode = graph.get_node(scheduled_task.task_name) |
| | return task |
| | |
| | async def schedule_next_action(self, goal: str, task: WorkFlowNode, agent_manager: AgentManager, env: Environment = None, **kwargs) -> NextAction: |
| | """ |
| | Asynchronously return the next action to execute. If the task is completed, return None. |
| | """ |
| | execution_results = await self.action_scheduler.async_execute(llm=self.llm, task=task, agent_manager=agent_manager, env=env, return_prompt=True, **kwargs) |
| | if execution_results is None: |
| | return None |
| | next_action, prompt, *_ = execution_results |
| | message = Message( |
| | content=next_action, agent=type(self).__name__, action=self.action_scheduler.name, \ |
| | prompt=prompt, msg_type=MessageType.COMMAND, wf_goal=goal, wf_task=task.name, wf_task_desc=task.description |
| | ) |
| | env.update(message=message, state=TrajectoryState.COMPLETED) |
| | return next_action |
| |
|
| | async def extract_output(self, graph: WorkFlowGraph, env: Environment, **kwargs) -> str: |
| | """ |
| | Asynchronously extract output from the workflow execution. |
| | |
| | Args: |
| | graph (WorkFlowGraph): The workflow graph. |
| | env (Environment): The execution environment. |
| | |
| | Returns: |
| | str: The extracted output. |
| | """ |
| | |
| | end_tasks = graph.find_end_nodes() |
| | end_task_predecesssors = sum([graph.get_node_predecessors(node=end_task) for end_task in end_tasks], []) |
| | candidate_taks_with_output = list(set(end_tasks)|set(end_task_predecesssors)) |
| | candidate_msgs_with_output = [] |
| | for task in candidate_taks_with_output: |
| | |
| | candidate_msgs_with_output.extend(env.get_task_messages(tasks=task, n=1)) |
| | candidate_msgs_with_output = Message.sort_by_timestamp(messages=candidate_msgs_with_output) |
| |
|
| | prompt = OUTPUT_EXTRACTION_PROMPT.format( |
| | goal=graph.goal, |
| | workflow_graph_representation=graph.get_workflow_description(), |
| | workflow_execution_results="\n\n".join([str(msg) for msg in candidate_msgs_with_output]), |
| | ) |
| | llm_output: LLMOutputParser = await self.llm.async_generate(prompt=prompt) |
| | return llm_output.content |
| |
|
| | def save_module(self, path: str, ignore: List[str] = [], **kwargs)-> str: |
| | ignore_fields = self._save_ignore_fields + ignore |
| | super().save_module(path=path, ignore=ignore_fields, **kwargs) |
| |
|