| """ |
| RestrictedPythonTools - Self-Healing Python Execution with Shell Backend |
| |
| This toolkit provides Python code execution with built-in directory constraints, |
| path auto-correction, and self-healing capabilities. Uses RestrictedShellTools |
| as the backend execution engine, mirroring Claude Code's architecture. |
| """ |
|
|
| import os |
| import re |
| import ast |
| import sys |
| import json |
| import time |
| import uuid |
| import tempfile |
| from pathlib import Path |
| from typing import Optional, Dict, Any, List |
| from agno.tools import Toolkit |
| from agno.utils.log import logger |
|
|
| from .shell_toolkit import RestrictedShellTools |
|
|
|
|
| class RestrictedPythonTools(Toolkit): |
| """ |
| Self-healing Python execution toolkit with directory constraints. |
| |
| Uses RestrictedShellTools as backend for secure, constrained Python execution. |
| Includes automatic path correction, package installation, and error recovery. |
| """ |
| |
| def __init__(self, base_dir: Optional[Path] = None, **kwargs): |
| """ |
| Initialize the restricted Python toolkit. |
| |
| Args: |
| base_dir: Base directory to constrain all Python operations to |
| **kwargs: Additional arguments passed to parent Toolkit |
| """ |
| self.base_dir = Path(base_dir) if base_dir else Path.cwd() |
| self.base_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| self.shell_tools = RestrictedShellTools(base_dir=self.base_dir) |
| |
| |
| self.installed_packages = set() |
| |
| |
| super().__init__( |
| name="restricted_python_tools", |
| tools=[ |
| self.run_python_code, |
| self.install_package, |
| self.save_python_file, |
| self.list_python_files, |
| self.validate_python_syntax |
| ], |
| **kwargs |
| ) |
| |
| logger.info(f"RestrictedPythonTools initialized with base_dir: {self.base_dir}") |
| |
| def run_python_code(self, code: str, timeout: int = 120) -> str: |
| """ |
| Execute Python code with self-healing and directory constraints. |
| |
| Args: |
| code (str): Python code to execute |
| timeout (int): Maximum execution time in seconds |
| |
| Returns: |
| str: Output from code execution or error message |
| """ |
| try: |
| |
| healed_code = self._heal_python_code(code) |
| |
| |
| syntax_result = self.validate_python_syntax(healed_code) |
| if "Error" in syntax_result: |
| return f"Syntax Error: {syntax_result}" |
| |
| |
| self._auto_install_packages(healed_code) |
| |
| |
| temp_filename = f"temp_script_{uuid.uuid4().hex[:8]}.py" |
| temp_filepath = self.base_dir / temp_filename |
| |
| try: |
| |
| with open(temp_filepath, 'w', encoding='utf-8') as f: |
| f.write(healed_code) |
| |
| logger.info(f"Executing Python code via shell backend: {temp_filename}") |
| |
| |
| execution_command = f"python3 {temp_filename}" |
| result = self.shell_tools.run_shell_command(execution_command, timeout=timeout) |
| |
| |
| if self._has_execution_errors(result): |
| recovery_result = self._attempt_error_recovery(healed_code, result, temp_filename, timeout) |
| if recovery_result: |
| result = recovery_result |
| |
| return result |
| |
| finally: |
| |
| if temp_filepath.exists(): |
| temp_filepath.unlink() |
| |
| except Exception as e: |
| error_msg = f"Error executing Python code: {str(e)}" |
| logger.error(error_msg) |
| return error_msg |
| |
| def _heal_python_code(self, code: str) -> str: |
| """ |
| Auto-correct common path and directory issues in Python code. |
| |
| Args: |
| code (str): Original Python code |
| |
| Returns: |
| str: Healed Python code with corrected paths |
| """ |
| healed_code = code |
| |
| |
| path_corrections = [ |
| |
| (r'\.\./', ''), |
| (r'\.\./\.\./', ''), |
| (r'\.\.\\', ''), |
| |
| |
| (r'["\']\/[^"\']*\/([^"\'\/]+\.(xlsx?|csv|json|txt|py))["\']', r'"\1"'), |
| |
| |
| (r'pd\.to_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.to_excel("\1"'), |
| (r'pd\.read_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.read_excel("\1"'), |
| (r'pd\.to_csv\(["\'][^"\']*\/([^"\'\/]+\.csv)["\']', r'pd.to_csv("\1"'), |
| |
| |
| (r'open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'open("\1"'), |
| (r'with open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'with open("\1"'), |
| ] |
| |
| for pattern, replacement in path_corrections: |
| healed_code = re.sub(pattern, replacement, healed_code) |
| |
| |
| directory_insurance = f""" |
| import os |
| import sys |
| |
| # Ensure we're in the correct working directory |
| base_dir = r'{self.base_dir}' |
| if os.getcwd() != base_dir: |
| os.chdir(base_dir) |
| print(f"Working directory corrected to: {{os.getcwd()}}") |
| |
| """ |
| |
| |
| healed_code = directory_insurance + healed_code |
| |
| logger.debug(f"Code healing applied - original length: {len(code)}, healed length: {len(healed_code)}") |
| return healed_code |
| |
| def _extract_required_packages(self, code: str) -> List[str]: |
| """ |
| Extract package names from import statements in Python code. |
| |
| Args: |
| code (str): Python code to analyze |
| |
| Returns: |
| List[str]: List of package names that need to be installed |
| """ |
| packages = set() |
| |
| |
| builtin_modules = { |
| 'os', 'sys', 'json', 'time', 'datetime', 'uuid', 'tempfile', |
| 're', 'ast', 'pathlib', 'math', 'random', 'subprocess', |
| 'collections', 'itertools', 'functools', 'logging', 'io', |
| 'csv', 'xml', 'urllib', 'http', 'email', 'sqlite3' |
| } |
| |
| |
| package_mappings = { |
| 'pandas': 'pandas', |
| 'numpy': 'numpy', |
| 'openpyxl': 'openpyxl', |
| 'xlsxwriter': 'xlsxwriter', |
| 'matplotlib': 'matplotlib', |
| 'seaborn': 'seaborn', |
| 'plotly': 'plotly', |
| 'requests': 'requests', |
| 'beautifulsoup4': 'beautifulsoup4', |
| 'bs4': 'beautifulsoup4', |
| 'sklearn': 'scikit-learn', |
| 'cv2': 'opencv-python', |
| 'PIL': 'Pillow', |
| 'yaml': 'PyYAML', |
| } |
| |
| |
| import_patterns = [ |
| r'^import\s+([a-zA-Z_][a-zA-Z0-9_]*)', |
| r'^from\s+([a-zA-Z_][a-zA-Z0-9_]*)\s+import', |
| ] |
| |
| for line in code.split('\n'): |
| line = line.strip() |
| for pattern in import_patterns: |
| match = re.match(pattern, line) |
| if match: |
| package_name = match.group(1) |
| |
| |
| if package_name in builtin_modules: |
| continue |
| |
| |
| pip_package = package_mappings.get(package_name, package_name) |
| packages.add(pip_package) |
| |
| return list(packages) |
| |
| def _auto_install_packages(self, code: str) -> None: |
| """ |
| Automatically install required packages for the Python code. |
| |
| Args: |
| code (str): Python code to analyze for package requirements |
| """ |
| required_packages = self._extract_required_packages(code) |
| |
| for package in required_packages: |
| if package not in self.installed_packages: |
| logger.info(f"Auto-installing package: {package}") |
| install_result = self.install_package(package) |
| if "successfully" in install_result.lower(): |
| self.installed_packages.add(package) |
| else: |
| logger.warning(f"Failed to install package {package}: {install_result}") |
| |
| def _has_execution_errors(self, result: str) -> bool: |
| """ |
| Check if execution result contains errors that might be recoverable. |
| |
| Args: |
| result (str): Execution result to check |
| |
| Returns: |
| bool: True if recoverable errors are detected |
| """ |
| error_indicators = [ |
| "ModuleNotFoundError", |
| "ImportError", |
| "FileNotFoundError", |
| "PermissionError", |
| "No such file or directory", |
| ] |
| |
| return any(error in result for error in error_indicators) |
| |
| def _attempt_error_recovery(self, code: str, error_result: str, temp_filename: str, timeout: int) -> Optional[str]: |
| """ |
| Attempt to recover from execution errors. |
| |
| Args: |
| code (str): Original code that failed |
| error_result (str): Error message from failed execution |
| temp_filename (str): Temporary file name used |
| timeout (int): Execution timeout |
| |
| Returns: |
| Optional[str]: Recovery result if successful, None if recovery failed |
| """ |
| try: |
| |
| if "ModuleNotFoundError" in error_result or "ImportError" in error_result: |
| logger.info("Attempting recovery: Installing missing packages") |
| |
| |
| missing_package_match = re.search(r"No module named '([^']+)'", error_result) |
| if missing_package_match: |
| missing_package = missing_package_match.group(1) |
| install_result = self.install_package(missing_package) |
| |
| if "successfully" in install_result.lower(): |
| logger.info(f"Recovery successful: Installed {missing_package}") |
| |
| retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout) |
| return retry_result |
| |
| |
| if "FileNotFoundError" in error_result or "No such file or directory" in error_result: |
| logger.info("Attempting recovery: Fixing file path issues") |
| |
| |
| self.shell_tools.run_shell_command("mkdir -p data reports output") |
| |
| |
| retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout) |
| return retry_result |
| |
| except Exception as e: |
| logger.error(f"Error recovery failed: {str(e)}") |
| |
| return None |
| |
| def install_package(self, package_name: str) -> str: |
| """ |
| Install a Python package using pip via shell backend. |
| |
| Args: |
| package_name (str): Name of the package to install |
| |
| Returns: |
| str: Installation result message |
| """ |
| try: |
| logger.info(f"Installing Python package: {package_name}") |
| |
| |
| install_commands = [ |
| f"pip3 install {package_name}", |
| f"python3 -m pip install {package_name}", |
| f"pip install {package_name}", |
| ] |
| |
| for command in install_commands: |
| result = self.shell_tools.run_shell_command(command, timeout=120) |
| |
| if "Successfully installed" in result or "already satisfied" in result: |
| self.installed_packages.add(package_name) |
| return f"Package '{package_name}' installed successfully" |
| |
| |
| if "error" not in result.lower(): |
| break |
| |
| return f"Package installation failed: {result}" |
| |
| except Exception as e: |
| error_msg = f"Error installing package '{package_name}': {str(e)}" |
| logger.error(error_msg) |
| return error_msg |
| |
| def save_python_file(self, filename: str, code: str) -> str: |
| """ |
| Save Python code to a file in the base directory. |
| |
| Args: |
| filename (str): Name of the Python file |
| code (str): Python code content |
| |
| Returns: |
| str: Success/failure message |
| """ |
| try: |
| if not filename.endswith('.py'): |
| filename += '.py' |
| |
| filepath = self.base_dir / filename |
| |
| |
| healed_code = self._heal_python_code(code) |
| |
| with open(filepath, 'w', encoding='utf-8') as f: |
| f.write(healed_code) |
| |
| logger.info(f"Python file saved: {filename}") |
| return f"Python file '{filename}' saved successfully to {self.base_dir}" |
| |
| except Exception as e: |
| error_msg = f"Error saving Python file '{filename}': {str(e)}" |
| logger.error(error_msg) |
| return error_msg |
| |
| def list_python_files(self) -> str: |
| """ |
| List all Python files in the base directory. |
| |
| Returns: |
| str: List of Python files |
| """ |
| try: |
| python_files = list(self.base_dir.glob("*.py")) |
| |
| if not python_files: |
| return "No Python files found in the base directory" |
| |
| file_list = [] |
| for file_path in python_files: |
| file_stat = file_path.stat() |
| file_info = f"{file_path.name} ({file_stat.st_size} bytes, modified: {time.ctime(file_stat.st_mtime)})" |
| file_list.append(file_info) |
| |
| return "Python files in base directory:\n" + "\n".join(file_list) |
| |
| except Exception as e: |
| error_msg = f"Error listing Python files: {str(e)}" |
| logger.error(error_msg) |
| return error_msg |
| |
| def validate_python_syntax(self, code: str) -> str: |
| """ |
| Validate Python code syntax without executing it. |
| |
| Args: |
| code (str): Python code to validate |
| |
| Returns: |
| str: Validation result message |
| """ |
| try: |
| |
| ast.parse(code) |
| return "Python syntax is valid" |
| |
| except SyntaxError as e: |
| error_msg = f"Syntax Error at line {e.lineno}: {e.msg}" |
| logger.warning(f"Python syntax validation failed: {error_msg}") |
| return error_msg |
| |
| except Exception as e: |
| error_msg = f"Error validating Python syntax: {str(e)}" |
| logger.error(error_msg) |
| return error_msg |
| |
| def get_base_directory(self) -> str: |
| """ |
| Get the current base directory path. |
| |
| Returns: |
| str: Absolute path of the base directory |
| """ |
| return str(self.base_dir.absolute()) |
| |
| def clear_temp_files(self) -> str: |
| """ |
| Clean up any temporary Python files in the base directory. |
| |
| Returns: |
| str: Cleanup result message |
| """ |
| try: |
| temp_files = list(self.base_dir.glob("temp_script_*.py")) |
| |
| if not temp_files: |
| return "No temporary files to clean up" |
| |
| for temp_file in temp_files: |
| temp_file.unlink() |
| |
| return f"Cleaned up {len(temp_files)} temporary Python files" |
| |
| except Exception as e: |
| error_msg = f"Error cleaning up temporary files: {str(e)}" |
| logger.error(error_msg) |
| return error_msg |