| | import io |
| | import shlex |
| | import tarfile |
| | import uuid |
| | import docker |
| | from pathlib import Path |
| | from typing import ClassVar, Dict, List, Optional |
| | from .interpreter_base import BaseInterpreter |
| | from .tool import Tool,Toolkit |
| | from .storage_handler import FileStorageHandler |
| | from pydantic import Field |
| |
|
| | class DockerInterpreter(BaseInterpreter): |
| | """ |
| | A Docker-based interpreter for executing Python, Bash, and R scripts in an isolated environment. |
| | """ |
| | |
| | CODE_EXECUTE_CMD_MAPPING: ClassVar[Dict[str, str]] = { |
| | "python": "python {file_name}", |
| | } |
| |
|
| | CODE_TYPE_MAPPING: ClassVar[Dict[str, str]] = { |
| | "python": "python", |
| | "py3": "python", |
| | "python3": "python", |
| | "py": "python", |
| | } |
| |
|
| | require_confirm:bool = Field(default=False, description="Whether to require confirmation before executing code") |
| | print_stdout:bool = Field(default=True, description="Whether to print stdout") |
| | print_stderr:bool = Field(default=True, description="Whether to print stderr") |
| | host_directory:str = Field(default="", description="The path to the host directory to use for the container") |
| | container_directory:str = Field(default="/home/app/", description="The directory to use for the container") |
| | container_command:str = Field(default="tail -f /dev/null", description="The command to use for the container") |
| | tmp_directory:str = Field(default="/tmp", description="The directory to use for the container") |
| | image_tag:Optional[str] = Field(default=None, description="The Docker image tag to use") |
| | dockerfile_path:Optional[str] = Field(default=None, description="Path to the Dockerfile to build") |
| | auto_cleanup:bool = Field(default=True, description="Whether to automatically cleanup container on cleanup() call") |
| | auto_destroy:bool = Field(default=True, description="Whether to automatically cleanup container on object destruction") |
| | |
| | class Config: |
| | arbitrary_types_allowed = True |
| |
|
| | def __init__( |
| | self, |
| | name:str = "DockerInterpreter", |
| | image_tag:Optional[str] = None, |
| | dockerfile_path:Optional[str] = None, |
| | require_confirm:bool = False, |
| | print_stdout:bool = True, |
| | print_stderr:bool = True, |
| | host_directory:str = "", |
| | container_directory:str = "/home/app/", |
| | container_command:str = "tail -f /dev/null", |
| | tmp_directory:str = "/tmp", |
| | storage_handler: FileStorageHandler = None, |
| | auto_cleanup:bool = True, |
| | auto_destroy:bool = True, |
| | **data |
| | ): |
| | """ |
| | Initialize a Docker-based interpreter for executing code in an isolated environment. |
| | |
| | Args: |
| | name (str): The name of the interpreter |
| | image_tag (str, optional): The Docker image tag to use. Must be provided if dockerfile_path is not. |
| | dockerfile_path (str, optional): Path to the Dockerfile to build. Must be provided if image_tag is not. |
| | require_confirm (bool): Whether to require confirmation before executing code |
| | print_stdout (bool): Whether to print stdout from code execution |
| | print_stderr (bool): Whether to print stderr from code execution |
| | host_directory (str): The path to the host directory to mount in the container |
| | container_directory (str): The target directory inside the container |
| | container_command (str): The command to run in the container |
| | tmp_directory (str): The temporary directory to use for file creation in the container |
| | **data: Additional data to pass to the parent class |
| | """ |
| | |
| | super().__init__(name=name, **data) |
| | |
| | self.require_confirm = require_confirm |
| | self.print_stdout = print_stdout |
| | self.print_stderr = print_stderr |
| | self.host_directory = host_directory |
| | self.container_directory = container_directory |
| | self.container_command = container_command |
| | self.tmp_directory = tmp_directory |
| | |
| | |
| | self.client = docker.from_env() |
| | self.container = None |
| | self.image_tag = image_tag |
| | self.dockerfile_path = dockerfile_path |
| | self.storage_handler = storage_handler |
| | self.auto_cleanup = auto_cleanup |
| | self.auto_destroy = auto_destroy |
| | self._initialize_if_needed() |
| | |
| | |
| | if self.host_directory: |
| | self._upload_directory_to_container(self.host_directory) |
| |
|
| | def __del__(self): |
| | try: |
| | if hasattr(self, 'auto_destroy') and self.auto_destroy and hasattr(self, 'container') and self.container is not None: |
| | self.container.remove(force=True) |
| | except Exception: |
| | pass |
| |
|
| | def __enter__(self): |
| | return self |
| |
|
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | self.cleanup() |
| |
|
| | def cleanup(self): |
| | """Explicitly clean up the container and Docker client.""" |
| | if self.auto_cleanup: |
| | try: |
| | if hasattr(self, 'container') and self.container is not None: |
| | self.container.remove(force=True) |
| | self.container = None |
| | except Exception: |
| | pass |
| | try: |
| | if hasattr(self, 'client') and self.client is not None: |
| | self.client.close() |
| | self.client = None |
| | except Exception: |
| | pass |
| |
|
| | def _initialize_if_needed(self): |
| | image_tag = self.image_tag |
| | dockerfile_path = self.dockerfile_path |
| | if image_tag: |
| | try: |
| | |
| | self.client.images.get(image_tag) |
| | except Exception as e: |
| | raise ValueError(f"Image provided in image_tag but not found: {e}") |
| | else: |
| | |
| | if not dockerfile_path: |
| | raise ValueError("dockerfile_path or image_tag must be provided to build the image") |
| | |
| | dockerfile_path = Path(dockerfile_path) |
| | if not dockerfile_path.exists(): |
| | raise FileNotFoundError(f"Dockerfile not found at provided path: {dockerfile_path}") |
| | |
| | dockerfile_dir = dockerfile_path.parent |
| | self.client.images.build(path=str(dockerfile_dir), tag=image_tag, rm=True, buildargs={}) |
| |
|
| | |
| | try: |
| | self.client.ping() |
| | except Exception as e: |
| | raise RuntimeError(f"Docker daemon is not running: {e}") |
| |
|
| | |
| | self.container = self.client.containers.run( |
| | image_tag, |
| | detach=True, |
| | command=self.container_command, |
| | working_dir=self.container_directory |
| | ) |
| |
|
| | def _upload_directory_to_container(self, host_directory: str): |
| | """ |
| | Uploads all files and directories from the given host directory to the container directory. |
| | |
| | :param host_directory: Path to the local directory containing files to upload. |
| | :param container_directory: Target directory inside the container (defaults to self.container_directory). |
| | """ |
| | host_directory = Path(host_directory).resolve() |
| | if not host_directory.exists() or not host_directory.is_dir(): |
| | raise FileNotFoundError(f"Directory not found: {host_directory}") |
| |
|
| | tar_stream = io.BytesIO() |
| | |
| | with tarfile.open(fileobj=tar_stream, mode="w") as tar: |
| | for file_path in host_directory.rglob("*"): |
| | if file_path.is_file(): |
| | |
| | relative_path = file_path.relative_to(host_directory) |
| | target_path = Path(self.container_directory) / relative_path |
| | |
| | tarinfo = tarfile.TarInfo(name=str(target_path.relative_to(self.container_directory))) |
| | tarinfo.size = file_path.stat().st_size |
| | with open(file_path, "rb") as f: |
| | tar.addfile(tarinfo, f) |
| |
|
| | tar_stream.seek(0) |
| |
|
| | if self.container is None: |
| | raise RuntimeError("Container is not initialized.") |
| |
|
| | self.container.put_archive(self.container_directory, tar_stream) |
| |
|
| | |
| | |
| |
|
| | def _create_file_in_container(self, content: str) -> Path: |
| | filename = str(uuid.uuid4()) |
| | tar_stream = io.BytesIO() |
| | with tarfile.open(fileobj=tar_stream, mode='w') as tar: |
| | tarinfo = tarfile.TarInfo(name=filename) |
| | tarinfo.size = len(content.encode('utf-8')) |
| | tar.addfile(tarinfo, io.BytesIO(content.encode('utf-8'))) |
| | tar_stream.seek(0) |
| |
|
| | if self.container is None: |
| | raise RuntimeError("Container is not initialized.") |
| | |
| | try: |
| | self.container.put_archive(self.tmp_directory, tar_stream) |
| | except Exception as e: |
| | raise RuntimeError(f"Failed to create file in container: {e}") |
| | |
| | return Path(f"{self.tmp_directory}/{filename}") |
| |
|
| | def _run_file_in_container(self, file: Path, language: str) -> str: |
| | """Execute a file in the container with timeout and security checks.""" |
| | if not self.container: |
| | raise RuntimeError("Container is not initialized") |
| | |
| | |
| | container_info = self.client.api.inspect_container(self.container.id) |
| | if not container_info['State']['Running']: |
| | raise RuntimeError("Container is not running") |
| | |
| | language = self._check_language(language) |
| | command = shlex.split(self.CODE_EXECUTE_CMD_MAPPING[language].format(file_name=file.as_posix())) |
| | if self.container is None: |
| | raise RuntimeError("Container is not initialized.") |
| | result = self.container.exec_run(command, demux=True) |
| |
|
| | stdout, stderr = result.output |
| | if self.print_stdout and stdout: |
| | print(stdout.decode()) |
| | if self.print_stderr and stderr: |
| | print(stderr.decode()) |
| |
|
| | stdout_str = stdout.decode() if stdout else "" |
| | stderr_str = stderr.decode() if stderr else "" |
| | return stdout_str + stderr_str |
| |
|
| | def execute(self, code: str, language: str) -> str: |
| | """ |
| | Executes code in a Docker container. |
| | |
| | Args: |
| | code (str): The code to execute |
| | language (str): The programming language to use |
| | |
| | Returns: |
| | str: The execution output |
| | |
| | Raises: |
| | RuntimeError: If container is not properly initialized or execution fails |
| | ValueError: If code content is invalid or exceeds limits |
| | """ |
| | if not code or not code.strip(): |
| | raise ValueError("Code content cannot be empty") |
| | |
| | if not self.container: |
| | raise RuntimeError("Container is not initialized") |
| | |
| | |
| | try: |
| | container_info = self.client.api.inspect_container(self.container.id) |
| | if not container_info['State']['Running']: |
| | raise RuntimeError("Container is not running") |
| | except Exception as e: |
| | raise RuntimeError(f"Failed to check container status: {e}") |
| |
|
| | if self.host_directory: |
| | code = f"import sys; sys.path.insert(0, '{self.container_directory}');" + code |
| | |
| | language = self._check_language(language) |
| | |
| | if self.require_confirm: |
| | confirmation = input(f"Confirm execution of {language} code? [Y/n]: ") |
| | if confirmation.lower() not in ["y", "yes", ""]: |
| | raise RuntimeError("Execution aborted by user.") |
| | |
| | try: |
| | file_path = self._create_file_in_container(code) |
| | return self._run_file_in_container(file_path, language) |
| | except Exception as e: |
| | raise RuntimeError(f"Code execution failed: {e}") |
| | finally: |
| | |
| | try: |
| | if hasattr(self, 'container') and self.container: |
| | self.container.exec_run(f"rm -f {file_path}") |
| | except Exception: |
| | pass |
| |
|
| | def execute_script(self, file_path: str, language: str = None) -> str: |
| | """ |
| | Reads code from a file and executes it in a Docker container. |
| | |
| | Args: |
| | file_path (str): The path to the script file to execute |
| | language (str, optional): The programming language of the code. If None, will be determined from the file extension. |
| | |
| | Returns: |
| | str: The execution output |
| | |
| | Raises: |
| | FileNotFoundError: If the script file does not exist |
| | RuntimeError: If container is not properly initialized or execution fails |
| | ValueError: If file content is invalid or exceeds limits |
| | """ |
| | |
| | result = self.storage_handler.read(file_path) |
| | if result["success"]: |
| | code = result["content"] |
| | else: |
| | raise RuntimeError(f"Could not read file '{file_path}': {result.get('error', 'Unknown error')}") |
| | |
| | |
| | return self.execute(code, language) |
| |
|
| | def _check_language(self, language: str) -> str: |
| | if language not in self.CODE_TYPE_MAPPING: |
| | raise ValueError(f"Unsupported language: {language}") |
| | return self.CODE_TYPE_MAPPING[language] |
| |
|
| |
|
| | class DockerExecuteTool(Tool): |
| | name: str = "docker_execute" |
| | description: str = "Execute code in a secure Docker container environment" |
| | inputs: Dict[str, Dict[str, str]] = { |
| | "code": { |
| | "type": "string", |
| | "description": "The code to execute" |
| | }, |
| | "language": { |
| | "type": "string", |
| | "description": "The programming language of the code (e.g., python, py, python3)" |
| | } |
| | } |
| | required: Optional[List[str]] = ["code", "language"] |
| | |
| | def __init__(self, docker_interpreter: DockerInterpreter = None): |
| | super().__init__() |
| | self.docker_interpreter = docker_interpreter |
| | |
| | def __call__(self, code: str, language: str) -> str: |
| | """Execute code using the Docker interpreter.""" |
| | if not self.docker_interpreter: |
| | raise RuntimeError("Docker interpreter not initialized") |
| | |
| | try: |
| | return self.docker_interpreter.execute(code, language) |
| | except Exception as e: |
| | return f"Error executing code: {str(e)}" |
| |
|
| |
|
| | class DockerExecuteScriptTool(Tool): |
| | name: str = "docker_execute_script" |
| | description: str = "Execute code from a script file in a secure Docker container environment" |
| | inputs: Dict[str, Dict[str, str]] = { |
| | "file_path": { |
| | "type": "string", |
| | "description": "The path to the script file to execute" |
| | }, |
| | "language": { |
| | "type": "string", |
| | "description": "The programming language of the code. If not provided, will be determined from file extension" |
| | } |
| | } |
| | required: Optional[List[str]] = ["file_path", "language"] |
| | |
| | def __init__(self, docker_interpreter: DockerInterpreter = None): |
| | super().__init__() |
| | self.docker_interpreter = docker_interpreter |
| | |
| | def __call__(self, file_path: str, language: str) -> str: |
| | """Execute script file using the Docker interpreter.""" |
| | if not self.docker_interpreter: |
| | raise RuntimeError("Docker interpreter not initialized") |
| | |
| | try: |
| | return self.docker_interpreter.execute_script(file_path, language) |
| | except Exception as e: |
| | return f"Error executing script: {str(e)}" |
| |
|
| |
|
| | class DockerInterpreterToolkit(Toolkit): |
| | def __init__( |
| | self, |
| | name: str = "DockerInterpreterToolkit", |
| | image_tag: Optional[str] = None, |
| | dockerfile_path: Optional[str] = None, |
| | require_confirm: bool = False, |
| | print_stdout: bool = True, |
| | print_stderr: bool = True, |
| | host_directory: str = "", |
| | container_directory: str = "/home/app/", |
| | container_command: str = "tail -f /dev/null", |
| | tmp_directory: str = "/tmp", |
| | storage_handler: FileStorageHandler = None, |
| | auto_cleanup: bool = True, |
| | auto_destroy: bool = True, |
| | **kwargs |
| | ): |
| | |
| | if storage_handler is None: |
| | from .storage_handler import LocalStorageHandler |
| | storage_handler = LocalStorageHandler(base_path="./workplace/docker") |
| | |
| | |
| | docker_interpreter = DockerInterpreter( |
| | name="DockerInterpreter", |
| | image_tag=image_tag, |
| | dockerfile_path=dockerfile_path, |
| | require_confirm=require_confirm, |
| | print_stdout=print_stdout, |
| | print_stderr=print_stderr, |
| | host_directory=host_directory, |
| | container_directory=container_directory, |
| | container_command=container_command, |
| | tmp_directory=tmp_directory, |
| | storage_handler=storage_handler, |
| | auto_cleanup=auto_cleanup, |
| | auto_destroy=auto_destroy, |
| | **kwargs |
| | ) |
| | |
| | |
| | tools = [ |
| | DockerExecuteTool(docker_interpreter=docker_interpreter), |
| | DockerExecuteScriptTool(docker_interpreter=docker_interpreter) |
| | ] |
| | |
| | |
| | super().__init__(name=name, tools=tools) |
| | |
| | |
| | self.docker_interpreter = docker_interpreter |
| | self.storage_handler = storage_handler |
| | self.auto_cleanup = auto_cleanup |
| | self.auto_destroy = auto_destroy |
| | |
| | def cleanup(self): |
| | """Clean up the Docker interpreter and storage handler.""" |
| | try: |
| | if hasattr(self, 'auto_cleanup') and self.auto_cleanup: |
| | if hasattr(self, 'docker_interpreter') and self.docker_interpreter: |
| | self.docker_interpreter.cleanup() |
| | if hasattr(self, 'storage_handler') and self.storage_handler: |
| | try: |
| | self.storage_handler.cleanup() |
| | except Exception: |
| | pass |
| | except Exception: |
| | pass |
| |
|
| | def __del__(self): |
| | """Cleanup when toolkit is destroyed.""" |
| | try: |
| | if hasattr(self, 'auto_destroy') and self.auto_destroy: |
| | self.cleanup() |
| | except Exception: |
| | pass |
| | |
| |
|