| | import platform |
| | import subprocess |
| | from typing import Dict, Any, List, Optional |
| |
|
| | from .tool import Tool, Toolkit |
| | from .storage_handler import FileStorageHandler |
| | from ..core.logging import logger |
| |
|
| |
|
| | class CMDBase: |
| | """ |
| | Base class for command execution with permission checking and cross-platform support. |
| | """ |
| | |
| | def __init__(self, default_shell: str = None, storage_handler: FileStorageHandler = None): |
| | """ |
| | Initialize CMDBase with system detection and shell configuration. |
| | |
| | Args: |
| | default_shell: Override default shell detection |
| | storage_handler: Storage handler for file operations |
| | """ |
| | self.system = platform.system().lower() |
| | self.default_shell = default_shell or self._detect_default_shell() |
| | self.permission_cache = {} |
| | self.storage_handler = storage_handler |
| | |
| | def _detect_default_shell(self) -> str: |
| | """Detect the default shell for the current system.""" |
| | if self.system == "windows": |
| | return "cmd" |
| | elif self.system == "darwin": |
| | return "bash" |
| | else: |
| | return "bash" |
| | |
| | def _is_dangerous_command(self, command: str) -> Dict[str, Any]: |
| | """ |
| | Check if a command is potentially dangerous. |
| | |
| | Args: |
| | command: The command to check |
| | |
| | Returns: |
| | Dictionary with danger assessment |
| | """ |
| | dangerous_patterns = [ |
| | |
| | r"\brm\s+-rf\b", |
| | r"\bdel\s+/[sq]\b", |
| | r"\bformat\b", |
| | r"\bdd\b", |
| | r"\bshutdown\b", |
| | r"\breboot\b", |
| | r"\binit\s+[06]\b", |
| | |
| | |
| | r"\bnetcat\b", |
| | r"\bnc\b", |
| | r"\bssh\b", |
| | r"\bscp\b", |
| | |
| | |
| | r"\bkill\s+-9\b", |
| | r"\btaskkill\s+/f\b", |
| | |
| | |
| | r"\bapt\s+install\b", |
| | r"\byum\s+install\b", |
| | r"\bbrew\s+install\b", |
| | r"\bchoco\s+install\b", |
| | |
| | |
| | r"\buseradd\b", |
| | r"\buserdel\b", |
| | r"\bpasswd\b", |
| | |
| | |
| | r"\bmount\b", |
| | r"\bumount\b", |
| | r"\bchmod\s+777\b", |
| | r"\bchown\s+root\b", |
| | ] |
| | |
| | import re |
| | command_lower = command.lower() |
| | |
| | for pattern in dangerous_patterns: |
| | if re.search(pattern, command_lower): |
| | return { |
| | "is_dangerous": True, |
| | "reason": f"Command matches dangerous pattern: {pattern}", |
| | "risk_level": "high" |
| | } |
| | |
| | |
| | if command_lower.startswith(("sudo ", "runas ")): |
| | return { |
| | "is_dangerous": True, |
| | "reason": "Command requires elevated privileges", |
| | "risk_level": "high" |
| | } |
| | |
| | |
| | system_dirs = ["/etc/", "/usr/", "/var/", "/bin/", "/sbin/", "C:\\Windows\\", "C:\\Program Files\\"] |
| | for sys_dir in system_dirs: |
| | if sys_dir in command: |
| | return { |
| | "is_dangerous": True, |
| | "reason": f"Command operates on system directory: {sys_dir}", |
| | "risk_level": "medium" |
| | } |
| | |
| | return {"is_dangerous": False, "risk_level": "low"} |
| | |
| | def _request_permission(self, command: str, danger_assessment: Dict[str, Any]) -> bool: |
| | """ |
| | Request permission from user to execute command. |
| | |
| | Args: |
| | command: The command to execute |
| | danger_assessment: Assessment of command danger |
| | |
| | Returns: |
| | True if permission granted, False otherwise |
| | """ |
| | print(f"\n{'='*60}") |
| | print("🔒 PERMISSION REQUEST") |
| | print(f"{'='*60}") |
| | print(f"Command: {command}") |
| | print(f"System: {self.system}") |
| | print(f"Shell: {self.default_shell}") |
| | |
| | if danger_assessment["is_dangerous"]: |
| | print(f"⚠️ WARNING: {danger_assessment['reason']}") |
| | print(f"Risk Level: {danger_assessment['risk_level'].upper()}") |
| | else: |
| | print("✅ Command appears safe") |
| | |
| | print("\nDo you want to execute this command?") |
| | print("Options:") |
| | print(" y/Y - Yes, execute the command") |
| | print(" n/N - No, do not execute") |
| | print(" [reason] - No, with explanation") |
| | print(" [empty] - No, without explanation") |
| | |
| | try: |
| | response = input("\nYour response: ").strip().lower() |
| | |
| | if response in ['y', 'yes']: |
| | print("✅ Permission granted. Executing command...") |
| | return True |
| | elif response in ['n', 'no', '']: |
| | print("❌ Permission denied.") |
| | return False |
| | else: |
| | print(f"❌ Permission denied. Reason: {response}") |
| | return False |
| | |
| | except KeyboardInterrupt: |
| | print("\n❌ Permission request cancelled by user.") |
| | return False |
| | |
| | def execute_command(self, command: str, timeout: int = 30, cwd: str = None) -> Dict[str, Any]: |
| | """ |
| | Execute a command with permission checking. |
| | |
| | Args: |
| | command: The command to execute |
| | timeout: Command timeout in seconds |
| | cwd: Working directory for command execution |
| | |
| | Returns: |
| | Dictionary with execution results |
| | """ |
| | try: |
| | |
| | danger_assessment = self._is_dangerous_command(command) |
| | |
| | |
| | if not self._request_permission(command, danger_assessment): |
| | return { |
| | "success": False, |
| | "error": "Permission denied by user", |
| | "command": command, |
| | "stdout": "", |
| | "stderr": "", |
| | "return_code": None |
| | } |
| | |
| | |
| | if self.system == "windows": |
| | |
| | if self.default_shell == "cmd": |
| | cmd_args = ["cmd", "/c", command] |
| | else: |
| | cmd_args = ["powershell", "-Command", command] |
| | else: |
| | |
| | cmd_args = [self.default_shell, "-c", command] |
| | |
| | |
| | logger.info(f"Executing command: {command}") |
| | |
| | result = subprocess.run( |
| | cmd_args, |
| | capture_output=True, |
| | text=True, |
| | timeout=timeout, |
| | cwd=cwd, |
| | shell=False |
| | ) |
| | |
| | result_dict = { |
| | "success": result.returncode == 0, |
| | "command": command, |
| | "stdout": result.stdout, |
| | "stderr": result.stderr, |
| | "return_code": result.returncode, |
| | "system": self.system, |
| | "shell": self.default_shell |
| | } |
| | |
| | |
| | if self.storage_handler: |
| | result_dict["storage_handler"] = type(self.storage_handler).__name__ |
| | result_dict["storage_base_path"] = str(self.storage_handler.base_path) |
| | |
| | return result_dict |
| | |
| | except subprocess.TimeoutExpired: |
| | return { |
| | "success": False, |
| | "error": f"Command timed out after {timeout} seconds", |
| | "command": command, |
| | "stdout": "", |
| | "stderr": "", |
| | "return_code": None |
| | } |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "command": command, |
| | "stdout": "", |
| | "stderr": "", |
| | "return_code": None |
| | } |
| |
|
| |
|
| | class ExecuteCommandTool(Tool): |
| | name: str = "execute_command" |
| | description: str = "Execute a command line operation with permission checking and cross-platform support. Can handle all command line operations including directory creation, file listing, system info, and more." |
| | inputs: Dict[str, Dict[str, str]] = { |
| | "command": { |
| | "type": "string", |
| | "description": "The command to execute (e.g., 'ls -la', 'dir', 'mkdir test', 'pwd', 'whoami', 'date', etc.)" |
| | }, |
| | "timeout": { |
| | "type": "integer", |
| | "description": "Command timeout in seconds (default: 30)" |
| | }, |
| | "working_directory": { |
| | "type": "string", |
| | "description": "Working directory for command execution (optional)" |
| | } |
| | } |
| | required: Optional[List[str]] = ["command"] |
| |
|
| | def __init__(self, cmd_base: CMDBase = None): |
| | super().__init__() |
| | self.cmd_base = cmd_base or CMDBase() |
| |
|
| | def __call__(self, command: str, timeout: int = 30, working_directory: str = None) -> Dict[str, Any]: |
| | """ |
| | Execute a command with permission checking. |
| | |
| | Args: |
| | command: The command to execute |
| | timeout: Command timeout in seconds |
| | working_directory: Working directory for command execution |
| | |
| | Returns: |
| | Dictionary containing the command execution result |
| | """ |
| | try: |
| | result = self.cmd_base.execute_command( |
| | command=command, |
| | timeout=timeout, |
| | cwd=working_directory |
| | ) |
| | |
| | if result["success"]: |
| | logger.info(f"Successfully executed command: {command}") |
| | else: |
| | logger.error(f"Failed to execute command {command}: {result.get('error', 'Unknown error')}") |
| | |
| | return result |
| | |
| | except Exception as e: |
| | logger.error(f"Error in execute_command tool: {str(e)}") |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "command": command |
| | } |
| |
|
| |
|
| | class CMDToolkit(Toolkit): |
| | """ |
| | Command line toolkit that provides safe command execution with permission checking |
| | and cross-platform support. Supports Linux, macOS, and Windows. |
| | """ |
| | |
| | def __init__(self, name: str = "CMDToolkit", default_shell: str = None, storage_handler: FileStorageHandler = None): |
| | """ |
| | Initialize the CMDToolkit with a shared command base instance. |
| | |
| | Args: |
| | name: Name of the toolkit |
| | default_shell: Override default shell detection |
| | storage_handler: Storage handler for file operations |
| | """ |
| | |
| | if storage_handler is None: |
| | from .storage_handler import LocalStorageHandler |
| | storage_handler = LocalStorageHandler(base_path="./workplace/cmd") |
| | |
| | |
| | cmd_base = CMDBase(default_shell=default_shell, storage_handler=storage_handler) |
| | |
| | |
| | tools = [ |
| | ExecuteCommandTool(cmd_base=cmd_base) |
| | ] |
| | |
| | |
| | super().__init__(name=name, tools=tools) |
| | self.cmd_base = cmd_base |
| | self.storage_handler = storage_handler |