| from typing import Dict, Any |
| from aiflows.base_flows.atomic import AtomicFlow |
| class MemoryWritingAtomicFlow(AtomicFlow): |
| """This class is used to write memory to memory files. |
| |
| *Input Interface*: |
| - `summary` (str): summary to write to memory file (logs) |
| - `memory_files` (dict): dictionary of memory files to write to. |
| |
| *Output Interface*: |
| - `MemWrite_output` (str): message that whether memory write was successful |
| |
| *Configuration Parameters*: |
| - `input_interface`: the input interface of the atomic flow |
| - `output_interface`: the output interface of the atomic flow |
| """ |
| def __init__(self, **kwargs): |
| """ |
| Initialize the atomic flow. |
| :param kwargs: additional key-value arguments to pass to the atomic flow |
| :type kwargs: Dict[str, Any] |
| """ |
| super().__init__(**kwargs) |
| self.supported_memories = ["summary"] |
| def _check_input(self, input_data: Dict[str, Any]): |
| """ |
| Check whether the input data is valid. |
| :param input_data: the input data to check |
| :type input_data: Dict[str, Any] |
| :raises AssertionError: if memory_files is not passed to MemoryWritingAtomicFlow |
| :raises AssertionError: if input data is not supported by MemoryWritingAtomicFlow |
| """ |
| assert "memory_files" in input_data, "memory_files not passed to MemoryWritingAtomicFlow" |
| assert any(item in input_data for item in self.supported_memories), "no memories to write" |
|
|
| def _call(self, input_data: Dict[str, Any]): |
| """ |
| Write memory to memory files. |
| :param input_data: the input data to write |
| :type input_data: Dict[str, Any] |
| :return: the output data |
| :rtype: Dict[str, Any] |
| :raises AssertionError: if logs is not in memory_files |
| """ |
| try: |
| if "summary" in input_data: |
| assert "logs" in input_data["memory_files"], "there is summary to write, but no logs file in memory_files" |
| logs_file_location = input_data["memory_files"]["logs"] |
| summary_to_write = input_data["summary"] |
| with open(logs_file_location, 'a') as file: |
| file.write(summary_to_write + "\n") |
| return {"MemWrite_output": "Memory Write was successful"} |
| except Exception as e: |
| return {"MemWrite_output": str(e)} |
| |
|
|
| def run( |
| self, |
| input_data: Dict[str, Any] |
| ): |
| """ |
| Run the atomic flow. |
| :param input_data: the input data to run |
| :type input_data: Dict[str, Any] |
| :return: the output data |
| :rtype: Dict[str, Any] |
| """ |
| self._check_input(input_data) |
| return self._call(input_data) |
|
|