| import os |
| import time |
| from typing import Dict, Any |
| import subprocess |
| from flow_modules.aiflows.InterpreterFlowModule import InterpreterAtomicFlow |
|
|
|
|
| class ExecuteCodeAtomicFlow(InterpreterAtomicFlow): |
| """This class inherits from InterpreterAtomicFlow and is used to execute code in a file. |
| It opens up the file in VSCode and waits for the user to save and close the file. Once the file is saved and closed, it reads the code |
| from the file and executes it. It then returns the output of the execution. |
| |
| *Input Interface*: |
| - temp_code_file_location: The location of the file containing the code to be executed. |
| - language: The language of the code to be executed. |
| |
| *Output Interface*: |
| - interpreter_output: The output of the execution of the code. |
| - code_ran: The code that was executed. |
| |
| *Configuration Parameters*: |
| - `input_interface`: The input interface of the atomic flow. |
| - `output_interface`: The output interface of the atomic flow. |
| |
| """ |
| def _prepare_code(self, input_data: Dict[str, Any]): |
| """ |
| This method reads the code from the file and stores it in the input_data dictionary. |
| :param input_data: The input_data dictionary. |
| :type input_data: Dict[str, Any] |
| :return: None |
| """ |
| file_location = input_data["temp_code_file_location"] |
| start_marker = "# Code:\n" |
| end_marker = "############" |
| code_started = False |
| code_str = "" |
| with open(file_location, 'r') as file: |
| for line in file: |
| if line.strip() == start_marker.strip(): |
| code_started = True |
| continue |
| if line.strip() == end_marker.strip(): |
| break |
| if code_started: |
| code_str += line |
| input_data["code"] = code_str |
|
|
| def _check_input(self, input_data: Dict[str, Any]): |
| """ |
| This method checks if the input_data dictionary contains the required keys. |
| :param input_data: The input_data dictionary. |
| :type input_data: Dict[str, Any] |
| :raises AssertionError: If the input_data dictionary does not contain the required keys. |
| :return: None |
| """ |
| assert "temp_code_file_location" in input_data, "temp_code_file_location not passed to ExecuteCodeAtomicFlow" |
| assert "language" in input_data, "language not passed to ExecuteCodeAtomicFlow" |
|
|
| def _delete_file(self, file_location): |
| """ |
| This method deletes the file at the given location. |
| :param file_location: The location of the file to be deleted. |
| :type file_location: str |
| :return: None |
| """ |
| if os.path.exists(file_location): |
| os.remove(file_location) |
|
|
| def _open_file_and_wait_for_upd(self, file_location): |
| """ |
| This method opens the file at the given location in VSCode and waits for the user to save the file. |
| :param file_location: The location of the file to be opened. |
| :type file_location: str |
| :return: None |
| """ |
| process = subprocess.Popen(["code", "--wait", file_location]) |
| while True: |
| if process.poll() is not None: |
| break |
| time.sleep(1) |
|
|
| def run( |
| self, |
| input_data: Dict[str, Any]): |
| """ |
| This method is called when the atomic flow is called. |
| :param input_data: The input_data dictionary. |
| :type input_data: Dict[str, Any] |
| :return: The output of the execution of the code. |
| :rtype: Dict[str, Any] |
| """ |
| self._check_input(input_data) |
| file_loc = input_data["temp_code_file_location"] |
| self._open_file_and_wait_for_upd(file_loc) |
| self._prepare_code(input_data) |
| self._process_input_data(input_data) |
| execution_output = self._call() |
| self._delete_file(file_loc) |
| response = {"interpreter_output": execution_output, "code_ran": input_data['code']} |
| return response |
|
|