| from copy import deepcopy |
| from typing import Dict, Any |
|
|
| import hydra |
| from flows.prompt_template import JinjaPrompt |
|
|
| from flows.base_flows import AtomicFlow |
| from flows.messages import UpdateMessage_Generic |
|
|
| from flows.utils import logging |
|
|
| |
| log = logging.get_logger(f"flows.{__name__}") |
|
|
|
|
| class HumanStandardInputFlow(AtomicFlow): |
| REQUIRED_KEYS_CONFIG = ["request_multi_line_input_flag"] |
|
|
| query_message_prompt_template: JinjaPrompt = None |
|
|
| __default_flow_config = { |
| "end_of_input_string": "EOI", |
| "input_keys": [], |
| "description": "Reads input from the user's standard input.", |
| "query_message_prompt_template": { |
| "_target_": "flows.prompt_template.JinjaPrompt", |
| "template": "", |
| "input_variables": [], |
| "partial_variables": {}, |
| } |
| } |
|
|
| def __init__(self, query_message_prompt_template, **kwargs): |
| super().__init__(**kwargs) |
| self.query_message_prompt_template = query_message_prompt_template |
|
|
| @classmethod |
| def _set_up_prompts(cls, config): |
| kwargs = {} |
|
|
| kwargs["query_message_prompt_template"] = \ |
| hydra.utils.instantiate(config['query_message_prompt_template'], _convert_="partial") |
| return kwargs |
|
|
| @classmethod |
| def instantiate_from_config(cls, config): |
| flow_config = deepcopy(config) |
|
|
| kwargs = {"flow_config": flow_config} |
|
|
| |
| kwargs.update(cls._set_up_prompts(flow_config)) |
|
|
| |
| return cls(**kwargs) |
|
|
| @staticmethod |
| def _get_message(prompt_template, input_data: Dict[str, Any]): |
| template_kwargs = {} |
| for input_variable in prompt_template.input_variables: |
| template_kwargs[input_variable] = input_data[input_variable] |
|
|
| msg_content = prompt_template.format(**template_kwargs) |
| return msg_content |
|
|
| def _read_input(self): |
| if not self.flow_config["request_multi_line_input_flag"]: |
| log.info("Please enter you single-line response and press enter.") |
| human_input = input() |
| return human_input |
|
|
| end_of_input_string = self.flow_config["end_of_input_string"] |
| log.info(f"Please enter your multi-line response below. " |
| f"To submit the response, write `{end_of_input_string}` on a new line and press enter.") |
|
|
| content = [] |
| while True: |
| line = input() |
| if line == self.flow_config["end_of_input_string"]: |
| break |
| content.append(line) |
| human_input = "\n".join(content) |
| return human_input |
|
|
| def run(self, |
| input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
| query_message = self._get_message(self.query_message_prompt_template, input_data) |
| state_update_message = UpdateMessage_Generic( |
| created_by=self.flow_config['name'], |
| updated_flow=self.flow_config["name"], |
| data={"query_message": query_message}, |
| ) |
| self._log_message(state_update_message) |
|
|
| log.info(query_message) |
| human_input = self._read_input() |
|
|
| return {"human_input": human_input} |
|
|