| from typing import Dict, Any |
| from aiflows.utils import logging |
| log = logging.get_logger(__name__) |
|
|
| from flow_modules.aiflows.AbstractBossFlowModule import AbstractBossFlow |
|
|
| class CoderFlow(AbstractBossFlow): |
| """Coder flow is one executor branch of the Jarvis flow. At a higher level, it is a flow that |
| writes and runs code given a goal. In the Jarvis flow, the Coder flow in invoked by the controller, |
| The Coder flow receives the goal generated by the controller, writes and runs code in an interactive fashion. |
| |
| The Coder flow has the similar structure as the Jarvis flow (inherited from AbstractBossFlow). |
| |
| |
| *Input Interface (expected input)* |
| - `goal` (str): The goal from the caller (source flow, i.e. JarvisFlow) |
| |
| |
| *Output Interface (expected output)* |
| - `result` (str): The result of the flow, the result will be returned to the caller (i.e. JarvisFlow). |
| - `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow (i.e. JarvisFlow). |
| |
| Typical workflow of Coder: |
| 0. JarvisFlow calls Coder with a goal. |
| 1. MemoryReading reads plans, logs and code library. |
| 2. Planner makes plan based on goal. |
| 3. Extend library with the goal given by the controller. |
| 4. Run code with code (possibly calls the newly written function) given by the controller. |
| 5. Finish and give an answer. |
| """ |
| def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
| """The run function of the Coder flow. |
| :param input_data: The input data of the flow. |
| :type input_data: Dict[str, Any] |
| :return: The output data of the flow. |
| :rtype: Dict[str, Any] |
| """ |
| |
| self._state_update_dict(update_data=input_data) |
|
|
| |
| self._state_update_dict(update_data={"memory_files": self.memory_files}) |
|
|
| max_rounds = self.flow_config.get("max_rounds", 1) |
| if max_rounds is None: |
| log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
|
|
| self._sequential_run(max_rounds=max_rounds) |
|
|
| output = self._get_output_from_state() |
|
|
| self.reset(full_reset=True, recursive=True, src_flow=self) |
|
|
| return output |