Spaces:
Running
Running
| import json | |
| import logging | |
| import re # For better cleanup | |
| from app.core.model_loader import llm_engine | |
| logger = logging.getLogger(__name__) | |
| class AIReviewerService: | |
| def __init__(self): | |
| pass | |
| def review_batch_code(self, files: list) -> list: | |
| results = [] | |
| # Optimized Batching for Free Tier (15 RPM) | |
| # 5 files per batch means 1 request covers 5 files. | |
| batch_size = 5 | |
| for i in range(0, len(files), batch_size): | |
| batch = files[i : i + batch_size] | |
| combined_code = "" | |
| file_names = [] # Track for fallback | |
| for f in batch: | |
| # Minify code to save tokens (remove comments/whitespace) | |
| raw_content = f.content or "" | |
| # Limit to 6k chars per file to fit 5 files safely in context | |
| minified_content = self._minify_code(raw_content[:6000], f.fileName) | |
| combined_code += f"\n--- FILE: {f.fileName} ---\n{minified_content}\n" | |
| file_names.append(f.fileName) | |
| prompt = f""" | |
| Analyze {len(batch)} files: | |
| {combined_code} | |
| Task: Detect severe security/logic issues. | |
| Output JSON array (1 obj/file): | |
| [{{"fileName": "path", "vulnerabilities": [{{"type": "SQLi", "line": 10, "description": "text"}}], "metrics": {{"complexity": 1-10, "maintainability": 1-10}}}}] | |
| """ | |
| try: | |
| # 8k output tokens is plenty for 5 files | |
| response_text = llm_engine.generate(prompt, max_tokens=8192) | |
| batch_results = self._parse_json(response_text) | |
| # Validation Logic | |
| processed_map = {item.get('fileName'): item for item in batch_results if isinstance(item, dict)} | |
| for fn in file_names: | |
| if fn in processed_map: | |
| res = processed_map[fn] | |
| res.setdefault("vulnerabilities", []) | |
| res.setdefault("metrics", {}) | |
| results.append(res) | |
| else: | |
| # Fallback if AI missed a file in the JSON list | |
| results.append({"fileName": fn, "vulnerabilities": [], "metrics": {}}) | |
| except Exception as e: | |
| logger.error(f"Batch error: {e}") | |
| for fn in file_names: | |
| results.append( | |
| {"fileName": fn, "vulnerabilities": [], "metrics": {}} | |
| ) | |
| return results | |
| def _minify_code(self, code: str, filename: str) -> str: | |
| """ | |
| Aggressive minification to reduce token usage. | |
| Removes: | |
| 1. Empty lines | |
| 2. Full-line comments | |
| 3. Inline comments | |
| 4. Logging/Print statements | |
| """ | |
| lines = code.split('\n') | |
| cleaned_lines = [] | |
| # Determine comment style | |
| is_python = filename.endswith('.py') | |
| is_js_style = filename.endswith(('.js', '.ts', '.jsx', '.tsx', '.java', '.c', '.cpp', '.cs', '.go', '.rs', '.php')) | |
| for line in lines: | |
| stripped = line.strip() | |
| # 1. Skip empty lines | |
| if not stripped: | |
| continue | |
| # 2. Skip full-line comments | |
| if is_python and stripped.startswith('#'): continue | |
| if is_js_style and stripped.startswith(('//', '/*', '*')): continue | |
| # 3. Skip logging (High token cost, low security relevance usually) | |
| # Check for common logging patterns | |
| if 'console.log' in stripped or 'print(' in stripped or 'logger.' in stripped or 'System.out.print' in stripped: | |
| continue | |
| # 4. Strip inline comments | |
| # Heuristic: split on " //" or " #" to avoid breaking URLs (http://) | |
| if is_python and ' #' in line: | |
| line = line.split(' #', 1)[0] | |
| if is_js_style and ' //' in line: | |
| line = line.split(' //', 1)[0] | |
| # If line became empty after stripping | |
| if not line.strip(): | |
| continue | |
| cleaned_lines.append(line.rstrip()) | |
| return '\n'.join(cleaned_lines) | |
| def _parse_json(self, text: str): | |
| text = text.strip() | |
| if not text: | |
| return [] | |
| # Aggressive cleanup: remove common junk | |
| text = re.sub(r"^[^[]*\[", "[", text) # Trim before [ | |
| text = re.sub(r"\][^]]*$", "]", text) # Trim after ] | |
| text = text.replace("```json", "").replace("```", "").strip() | |
| try: | |
| data = json.loads(text) | |
| return data | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"JSON error: {e} | Raw start: {text[:200]}...") | |
| return [] |