| | from typing import Dict, Any |
| | import os |
| |
|
| | from flow_modules.aiflows.ContentWriterFlowModule import ContentWriterFlow |
| | from aiflows.base_flows import CircularFlow |
| |
|
| | from aiflows.utils import logging |
| | log = logging.get_logger(__name__) |
| |
|
| |
|
| | class CodeWriterFlow(ContentWriterFlow): |
| | """This flow inherits from ContentWriterFlow, it is used to write code in an interactive way. |
| | In the subflow of the executor, we specify an InteractiveCodeGenFlow (https://huggingface.co/aiflows/InteractiveCodeGenFlowModule) |
| | |
| | *Input Interface*: |
| | - `goal` |
| | |
| | *Output Interface*: |
| | - `code` |
| | - `result` |
| | - `summary` |
| | - `status` |
| | |
| | *Configuration Parameters*: |
| | - `name`: Name of the flow |
| | - `description`: Description of the flow |
| | - `_target_`: The instantiation target of the flow |
| | - `input_interface`: The input to the flow. Inherited from ContentWriterFlow, in this case, it is `goal`. |
| | - `output_interface`: The output of the flow. |
| | - `subflows_config`: Configurations of subflows |
| | - `early_exit_keys`: The keys that will trigger an early exit of the flow |
| | - `topology`: Configures the topology of the subflows, please have a special look at the I/O interfaces of the subflows. |
| | |
| | """ |
| |
|
| | def _on_reach_max_round(self): |
| | """This function is called when the maximum amount of rounds was reached before the model generated the code. |
| | """ |
| | self._state_update_dict({ |
| | "code": "The maximum amount of rounds was reached before the model generated the code.", |
| | "status": "unfinished" |
| | }) |
| |
|
| | @CircularFlow.output_msg_payload_processor |
| | def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: |
| | """This function is used to detect whether the code generation process is finished or not. |
| | It is configured in the topology of the subflows, see CodeWriterFlow.yaml for more details. |
| | :param output_payload: The output payload of the subflow |
| | :param src_flow: The subflow that generated the output payload |
| | :return: The output payload of the subflow |
| | """ |
| | command = output_payload["command"] |
| | if command == "finish": |
| | |
| | keys_to_fetch_from_state = ["temp_code_file_location", "code"] |
| | fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
| | temp_code_file_location = fetched_state["temp_code_file_location"] |
| | code_content = fetched_state["code"] |
| | if os.path.exists(temp_code_file_location): |
| | os.remove(temp_code_file_location) |
| | |
| | return { |
| | "EARLY_EXIT": True, |
| | "code": code_content, |
| | "result": output_payload["command_args"]["summary"], |
| | "summary": "ExtendLibrary/CodeWriter: " + output_payload["command_args"]["summary"], |
| | "status": "finished" |
| | } |
| | elif command == "manual_finish": |
| | |
| | keys_to_fetch_from_state = ["temp_code_file_location"] |
| | fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
| | temp_code_file_location = fetched_state["temp_code_file_location"] |
| | if os.path.exists(temp_code_file_location): |
| | os.remove(temp_code_file_location) |
| | |
| | return { |
| | "EARLY_EXIT": True, |
| | "code": "no code was generated", |
| | "result": "CodeWriter was terminated explicitly by the user, process is unfinished", |
| | "summary": "ExtendLibrary/CodeWriter: CodeWriter was terminated explicitly by the user, process is unfinished", |
| | "status": "unfinished" |
| | } |
| | elif command == "test": |
| | |
| | keys_to_fetch_from_state = ["code"] |
| | fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
| |
|
| | |
| | code_content = fetched_state["code"] |
| | output_payload["command_args"]["code"] = code_content |
| |
|
| | return output_payload |
| | else: |
| | return output_payload |
| |
|
| | def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
| | """The run function of the flow. |
| | :param input_data: The input data of the flow |
| | :return: The output data of the flow |
| | """ |
| | |
| | self._state_update_dict(update_data=input_data) |
| |
|
| | max_rounds = self.flow_config.get("max_rounds", 1) |
| | if max_rounds is None: |
| | log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
| |
|
| | self._sequential_run(max_rounds=max_rounds) |
| |
|
| | output = self._get_output_from_state() |
| |
|
| | self.reset(full_reset=True, recursive=True, src_flow=self) |
| |
|
| | return output |