Coverage for app \ sandbox \ executor.py: 0%

80 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-30 09:36 +0100

1""" 

2Secure code executor using Docker containers 

3 

4Provides isolated execution environment for untrusted code with: 

5- Resource limits (CPU, memory, time) 

6- Network isolation 

7- Read-only filesystem 

8- No privilege escalation 

9""" 

10import logging 

11import tempfile 

12import subprocess 

13import json 

14from pathlib import Path 

15from typing import Dict, Any, Optional 

16from datetime import datetime 

17 

18from app.config import settings 

19from app.models.schemas import Language 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24class CodeExecutor: 

25 """Executes code in isolated Docker containers""" 

26 

27 # Docker images for different languages 

28 IMAGES = { 

29 Language.PYTHON: "python:3.11-slim", 

30 Language.JAVASCRIPT: "node:18-alpine", 

31 Language.TYPESCRIPT: "node:18-alpine", 

32 Language.BASH: "bash:5.2-alpine", 

33 } 

34 

35 def __init__(self): 

36 self.docker_available = self._check_docker() 

37 

38 def _check_docker(self) -> bool: 

39 """Check if Docker is available""" 

40 try: 

41 result = subprocess.run( 

42 ["docker", "--version"], 

43 capture_output=True, 

44 text=True, 

45 timeout=5 

46 ) 

47 if result.returncode == 0: 

48 logger.info(f"Docker available: {result.stdout.strip()}") 

49 return True 

50 except (FileNotFoundError, subprocess.TimeoutExpired): 

51 logger.warning("Docker not available - sandbox execution disabled") 

52 return False 

53 

54 async def execute( 

55 self, 

56 code: str, 

57 language: Language, 

58 test_input: Optional[str] = None, 

59 timeout: Optional[int] = None 

60 ) -> Dict[str, Any]: 

61 """ 

62 Execute code in sandbox 

63 

64 Args: 

65 code: Source code to execute 

66 language: Programming language 

67 test_input: Optional stdin input 

68 timeout: Execution timeout in seconds (default from settings) 

69 

70 Returns: 

71 Dict with execution results: 

72 - success: bool 

73 - stdout: str 

74 - stderr: str 

75 - exit_code: int 

76 - duration_ms: float 

77 - error: Optional[str] 

78 """ 

79 if not self.docker_available: 

80 return { 

81 "success": False, 

82 "error": "Docker not available", 

83 "stdout": "", 

84 "stderr": "", 

85 "exit_code": -1, 

86 "duration_ms": 0 

87 } 

88 

89 if not settings.sandbox_enabled: 

90 return { 

91 "success": False, 

92 "error": "Sandbox execution disabled in settings", 

93 "stdout": "", 

94 "stderr": "", 

95 "exit_code": -1, 

96 "duration_ms": 0 

97 } 

98 

99 timeout = timeout or settings.sandbox_timeout 

100 start_time = datetime.now() 

101 

102 try: 

103 # Get Docker image for language 

104 image = self.IMAGES.get(language) 

105 if not image: 

106 return { 

107 "success": False, 

108 "error": f"No Docker image configured for {language}", 

109 "stdout": "", 

110 "stderr": "", 

111 "exit_code": -1, 

112 "duration_ms": 0 

113 } 

114 

115 # Create temporary file for code 

116 with tempfile.NamedTemporaryFile( 

117 mode='w', 

118 suffix=self._get_extension(language), 

119 delete=False, 

120 encoding='utf-8' 

121 ) as tmp_file: 

122 tmp_file.write(code) 

123 code_path = Path(tmp_file.name) 

124 

125 try: 

126 # Build Docker command 

127 docker_cmd = self._build_docker_command( 

128 image=image, 

129 code_path=code_path, 

130 language=language, 

131 timeout=timeout, 

132 test_input=test_input 

133 ) 

134 

135 logger.info(f"Executing code in Docker: {' '.join(docker_cmd[:5])}...") 

136 

137 # Execute in Docker 

138 result = subprocess.run( 

139 docker_cmd, 

140 capture_output=True, 

141 text=True, 

142 timeout=timeout + 5, # Add buffer to Docker timeout 

143 input=test_input 

144 ) 

145 

146 duration_ms = (datetime.now() - start_time).total_seconds() * 1000 

147 

148 return { 

149 "success": result.returncode == 0, 

150 "stdout": result.stdout, 

151 "stderr": result.stderr, 

152 "exit_code": result.returncode, 

153 "duration_ms": duration_ms, 

154 "error": None if result.returncode == 0 else "Execution failed" 

155 } 

156 

157 finally: 

158 # Cleanup temporary file 

159 code_path.unlink(missing_ok=True) 

160 

161 except subprocess.TimeoutExpired: 

162 duration_ms = (datetime.now() - start_time).total_seconds() * 1000 

163 logger.warning(f"Code execution timed out after {timeout}s") 

164 return { 

165 "success": False, 

166 "error": f"Execution timed out after {timeout}s", 

167 "stdout": "", 

168 "stderr": "", 

169 "exit_code": -1, 

170 "duration_ms": duration_ms 

171 } 

172 

173 except Exception as e: 

174 duration_ms = (datetime.now() - start_time).total_seconds() * 1000 

175 logger.error(f"Sandbox execution failed: {e}") 

176 return { 

177 "success": False, 

178 "error": str(e), 

179 "stdout": "", 

180 "stderr": "", 

181 "exit_code": -1, 

182 "duration_ms": duration_ms 

183 } 

184 

185 def _build_docker_command( 

186 self, 

187 image: str, 

188 code_path: Path, 

189 language: Language, 

190 timeout: int, 

191 test_input: Optional[str] 

192 ) -> list[str]: 

193 """Build Docker run command with security restrictions""" 

194 

195 # Base command with security options 

196 cmd = [ 

197 "docker", "run", 

198 "--rm", # Remove container after execution 

199 "--network", "none", # No network access 

200 "--memory", settings.sandbox_memory_limit, # Memory limit 

201 "--cpus", "1.0", # CPU limit 

202 "--pids-limit", "50", # Process limit 

203 "--read-only", # Read-only filesystem 

204 "--tmpfs", "/tmp:rw,noexec,nosuid,size=100m", # Temp directory 

205 "--security-opt", "no-new-privileges", # No privilege escalation 

206 "--cap-drop", "ALL", # Drop all capabilities 

207 "-v", f"{code_path.absolute()}:/code/{code_path.name}:ro", # Mount code read-only 

208 ] 

209 

210 # Add execution command based on language 

211 if language == Language.PYTHON: 

212 cmd.extend([ 

213 image, 

214 "timeout", str(timeout), 

215 "python", f"/code/{code_path.name}" 

216 ]) 

217 

218 elif language in [Language.JAVASCRIPT, Language.TYPESCRIPT]: 

219 cmd.extend([ 

220 image, 

221 "timeout", str(timeout), 

222 "node", f"/code/{code_path.name}" 

223 ]) 

224 

225 elif language == Language.BASH: 

226 cmd.extend([ 

227 image, 

228 "timeout", str(timeout), 

229 "bash", f"/code/{code_path.name}" 

230 ]) 

231 

232 return cmd 

233 

234 def _get_extension(self, language: Language) -> str: 

235 """Get file extension for language""" 

236 extensions = { 

237 Language.PYTHON: ".py", 

238 Language.JAVASCRIPT: ".js", 

239 Language.TYPESCRIPT: ".ts", 

240 Language.BASH: ".sh", 

241 } 

242 return extensions.get(language, ".txt") 

243 

244 async def test_code( 

245 self, 

246 code: str, 

247 language: Language, 

248 test_cases: list[Dict[str, Any]] 

249 ) -> Dict[str, Any]: 

250 """ 

251 Run code against multiple test cases 

252 

253 Args: 

254 code: Source code 

255 language: Programming language 

256 test_cases: List of test cases with 'input' and 'expected_output' 

257 

258 Returns: 

259 Dict with test results 

260 """ 

261 if not self.docker_available: 

262 return { 

263 "success": False, 

264 "error": "Docker not available", 

265 "passed": 0, 

266 "failed": 0, 

267 "results": [] 

268 } 

269 

270 results = [] 

271 passed = 0 

272 failed = 0 

273 

274 for i, test_case in enumerate(test_cases): 

275 test_input = test_case.get("input", "") 

276 expected_output = test_case.get("expected_output", "") 

277 

278 result = await self.execute( 

279 code=code, 

280 language=language, 

281 test_input=test_input, 

282 timeout=5 # Shorter timeout for tests 

283 ) 

284 

285 test_passed = ( 

286 result["success"] and 

287 result["stdout"].strip() == expected_output.strip() 

288 ) 

289 

290 if test_passed: 

291 passed += 1 

292 else: 

293 failed += 1 

294 

295 results.append({ 

296 "test_case": i + 1, 

297 "input": test_input, 

298 "expected": expected_output, 

299 "actual": result["stdout"], 

300 "passed": test_passed, 

301 "error": result.get("stderr") 

302 }) 

303 

304 return { 

305 "success": failed == 0, 

306 "passed": passed, 

307 "failed": failed, 

308 "total": len(test_cases), 

309 "results": results 

310 }