| |
| """Codette Tool System — Safe Local Tool Execution |
| |
| Gives Codette the ability to read files, search code, list directories, |
| and run safe Python snippets. Tools are sandboxed and read-only by default. |
| |
| Tool Call Format (in Codette's output): |
| <tool>tool_name(arg1, arg2)</tool> |
| |
| Tool Result (injected back into context): |
| <tool_result>...output...</tool_result> |
| |
| Architecture: |
| 1. Codette generates text that may contain <tool>...</tool> tags |
| 2. Server parses out tool calls |
| 3. Tools execute with safety limits |
| 4. Results are fed back for a second generation pass |
| """ |
|
|
| import os |
| import re |
| import ast |
| import json |
| import subprocess |
| import traceback |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple, Any |
|
|
| |
| |
| |
|
|
| |
| ALLOWED_ROOTS = [ |
| Path(r"J:\codette-clean"), |
| Path(r"C:\Users\Jonathan\Documents"), |
| ] |
|
|
| |
| READABLE_EXTENSIONS = { |
| ".py", ".js", ".ts", ".html", ".css", ".json", ".yaml", ".yml", |
| ".md", ".txt", ".csv", ".toml", ".cfg", ".ini", ".sh", ".bat", |
| ".bib", ".tex", ".log", ".jsonl", |
| } |
|
|
| |
| MAX_FILE_SIZE = 500_000 |
|
|
| |
| MAX_OUTPUT_LENGTH = 4000 |
|
|
| |
| MAX_LINES = 200 |
|
|
| |
| PYTHON_TIMEOUT = 10 |
|
|
|
|
| |
| |
| |
|
|
| class ToolRegistry: |
| """Registry of available tools with descriptions and handlers.""" |
|
|
| def __init__(self): |
| self.tools: Dict[str, dict] = {} |
| self._register_defaults() |
|
|
| def _register_defaults(self): |
| """Register the built-in tool set.""" |
|
|
| self.register("read_file", { |
| "description": "Read a file's contents. Args: path (str), start_line (int, optional), end_line (int, optional)", |
| "examples": [ |
| 'read_file("inference/codette_server.py")', |
| 'read_file("configs/adapter_registry.yaml", 1, 50)', |
| ], |
| "handler": tool_read_file, |
| }) |
|
|
| self.register("list_files", { |
| "description": "List files in a directory. Args: path (str), pattern (str, optional)", |
| "examples": [ |
| 'list_files("inference/")', |
| 'list_files("datasets/", "*.jsonl")', |
| ], |
| "handler": tool_list_files, |
| }) |
|
|
| self.register("search_code", { |
| "description": "Search for a text pattern across files. Args: pattern (str), path (str, optional), file_ext (str, optional)", |
| "examples": [ |
| 'search_code("phase_coherence")', |
| 'search_code("def route", "inference/", ".py")', |
| ], |
| "handler": tool_search_code, |
| }) |
|
|
| self.register("file_info", { |
| "description": "Get file metadata (size, modified time, line count). Args: path (str)", |
| "examples": [ |
| 'file_info("paper/codette_paper.pdf")', |
| ], |
| "handler": tool_file_info, |
| }) |
|
|
| self.register("run_python", { |
| "description": "Execute a short Python snippet and return output. For calculations, data processing, or quick checks. Args: code (str)", |
| "examples": [ |
| 'run_python("import math; print(math.pi * 2)")', |
| 'run_python("print(sorted([3,1,4,1,5,9]))")', |
| ], |
| "handler": tool_run_python, |
| }) |
|
|
| self.register("project_summary", { |
| "description": "Get an overview of the Codette project structure. No args.", |
| "examples": [ |
| 'project_summary()', |
| ], |
| "handler": tool_project_summary, |
| }) |
|
|
| def register(self, name: str, spec: dict): |
| self.tools[name] = spec |
|
|
| def get_descriptions(self) -> str: |
| """Format tool descriptions for injection into system prompt.""" |
| lines = ["Available tools (use <tool>name(args)</tool> to call):"] |
| for name, spec in self.tools.items(): |
| lines.append(f"\n {name}: {spec['description']}") |
| for ex in spec.get("examples", []): |
| lines.append(f" Example: <tool>{ex}</tool>") |
| return "\n".join(lines) |
|
|
| def execute(self, name: str, args: list, kwargs: dict) -> str: |
| """Execute a tool by name with parsed arguments.""" |
| if name not in self.tools: |
| return f"Error: Unknown tool '{name}'. Available: {', '.join(self.tools.keys())}" |
|
|
| handler = self.tools[name]["handler"] |
| try: |
| result = handler(*args, **kwargs) |
| |
| if len(result) > MAX_OUTPUT_LENGTH: |
| result = result[:MAX_OUTPUT_LENGTH] + f"\n... (truncated, {len(result)} chars total)" |
| return result |
| except Exception as e: |
| return f"Error executing {name}: {e}" |
|
|
|
|
| |
| |
| |
|
|
| def parse_tool_calls(text: str) -> List[Tuple[str, list, dict]]: |
| """Parse <tool>name(args)</tool> tags from generated text. |
| |
| Returns list of (tool_name, positional_args, keyword_args). |
| """ |
| pattern = r'<tool>\s*([\w]+)\s*\((.*?)\)\s*</tool>' |
| matches = re.findall(pattern, text, re.DOTALL) |
|
|
| calls = [] |
| for name, args_str in matches: |
| try: |
| |
| args, kwargs = _parse_args(args_str.strip()) |
| calls.append((name, args, kwargs)) |
| except Exception as e: |
| calls.append((name, [args_str.strip()], {})) |
|
|
| return calls |
|
|
|
|
| def _parse_args(args_str: str) -> Tuple[list, dict]: |
| """Safely parse function arguments string.""" |
| if not args_str: |
| return [], {} |
|
|
| |
| try: |
| |
| parsed = ast.literal_eval(f"({args_str},)") |
| return list(parsed), {} |
| except (ValueError, SyntaxError): |
| |
| |
| cleaned = args_str.strip().strip('"').strip("'") |
| return [cleaned], {} |
|
|
|
|
| def strip_tool_calls(text: str) -> str: |
| """Remove <tool>...</tool> tags from text, leaving the rest.""" |
| return re.sub(r'<tool>.*?</tool>', '', text, flags=re.DOTALL).strip() |
|
|
|
|
| def has_tool_calls(text: str) -> bool: |
| """Check if text contains any tool calls.""" |
| return bool(re.search(r'<tool>', text)) |
|
|
|
|
| |
| |
| |
|
|
| def _resolve_path(path_str: str) -> Optional[Path]: |
| """Resolve a path, ensuring it's within allowed roots.""" |
| |
| p = Path(path_str) |
| if not p.is_absolute(): |
| p = ALLOWED_ROOTS[0] / p |
|
|
| p = p.resolve() |
|
|
| |
| for root in ALLOWED_ROOTS: |
| try: |
| p.relative_to(root.resolve()) |
| return p |
| except ValueError: |
| continue |
|
|
| return None |
|
|
|
|
| |
| |
| |
|
|
| def tool_read_file(path: str, start_line: int = 1, end_line: int = None) -> str: |
| """Read a file's contents with optional line range.""" |
| resolved = _resolve_path(path) |
| if resolved is None: |
| return f"Error: Path '{path}' is outside allowed directories." |
|
|
| if not resolved.exists(): |
| return f"Error: File not found: {path}" |
|
|
| if not resolved.is_file(): |
| return f"Error: '{path}' is a directory, not a file. Use list_files() instead." |
|
|
| |
| if resolved.suffix.lower() not in READABLE_EXTENSIONS: |
| return f"Error: Cannot read {resolved.suffix} files. Supported: {', '.join(sorted(READABLE_EXTENSIONS))}" |
|
|
| |
| size = resolved.stat().st_size |
| if size > MAX_FILE_SIZE: |
| return f"Error: File too large ({size:,} bytes). Max: {MAX_FILE_SIZE:,} bytes." |
|
|
| try: |
| content = resolved.read_text(encoding='utf-8', errors='replace') |
| except Exception as e: |
| return f"Error reading file: {e}" |
|
|
| lines = content.splitlines() |
| total = len(lines) |
|
|
| |
| start = max(1, start_line) - 1 |
| end = min(end_line or total, start + MAX_LINES, total) |
|
|
| selected = lines[start:end] |
|
|
| |
| numbered = [] |
| for i, line in enumerate(selected, start=start + 1): |
| numbered.append(f"{i:4d} | {line}") |
|
|
| header = f"File: {path} ({total} lines total)" |
| if start > 0 or end < total: |
| header += f" [showing lines {start+1}-{end}]" |
|
|
| return header + "\n" + "\n".join(numbered) |
|
|
|
|
| def tool_list_files(path: str = ".", pattern: str = None) -> str: |
| """List files in a directory with optional glob pattern.""" |
| resolved = _resolve_path(path) |
| if resolved is None: |
| return f"Error: Path '{path}' is outside allowed directories." |
|
|
| if not resolved.exists(): |
| return f"Error: Directory not found: {path}" |
|
|
| if not resolved.is_dir(): |
| return f"Error: '{path}' is a file, not a directory. Use read_file() instead." |
|
|
| try: |
| if pattern: |
| entries = sorted(resolved.glob(pattern)) |
| else: |
| entries = sorted(resolved.iterdir()) |
|
|
| result = [f"Directory: {path}"] |
| for entry in entries[:100]: |
| rel = entry.relative_to(resolved) |
| if entry.is_dir(): |
| result.append(f" [DIR] {rel}/") |
| else: |
| size = entry.stat().st_size |
| if size >= 1024 * 1024: |
| size_str = f"{size / 1024 / 1024:.1f}MB" |
| elif size >= 1024: |
| size_str = f"{size / 1024:.1f}KB" |
| else: |
| size_str = f"{size}B" |
| result.append(f" [FILE] {rel} ({size_str})") |
|
|
| if len(entries) > 100: |
| result.append(f" ... and {len(entries) - 100} more") |
|
|
| return "\n".join(result) |
|
|
| except Exception as e: |
| return f"Error listing directory: {e}" |
|
|
|
|
| def tool_search_code(pattern: str, path: str = ".", file_ext: str = None) -> str: |
| """Search for a text pattern in files.""" |
| resolved = _resolve_path(path) |
| if resolved is None: |
| return f"Error: Path '{path}' is outside allowed directories." |
|
|
| if not resolved.exists(): |
| return f"Error: Path not found: {path}" |
|
|
| |
| if file_ext: |
| if not file_ext.startswith("."): |
| file_ext = "." + file_ext |
| glob = f"**/*{file_ext}" |
| else: |
| glob = "**/*" |
|
|
| results = [] |
| files_searched = 0 |
| matches_found = 0 |
|
|
| try: |
| search_root = resolved if resolved.is_dir() else resolved.parent |
|
|
| for filepath in search_root.glob(glob): |
| if not filepath.is_file(): |
| continue |
| if filepath.suffix.lower() not in READABLE_EXTENSIONS: |
| continue |
| if filepath.stat().st_size > MAX_FILE_SIZE: |
| continue |
|
|
| |
| parts = filepath.parts |
| if any(p.startswith('.') or p in ('__pycache__', 'node_modules', '.git') |
| for p in parts): |
| continue |
|
|
| files_searched += 1 |
|
|
| try: |
| content = filepath.read_text(encoding='utf-8', errors='replace') |
| for line_num, line in enumerate(content.splitlines(), 1): |
| if pattern.lower() in line.lower(): |
| rel = filepath.relative_to(search_root) |
| results.append(f" {rel}:{line_num}: {line.strip()[:120]}") |
| matches_found += 1 |
|
|
| if matches_found >= 50: |
| break |
| except Exception: |
| continue |
|
|
| if matches_found >= 50: |
| break |
|
|
| except Exception as e: |
| return f"Error searching: {e}" |
|
|
| header = f"Search: '{pattern}' in {path} ({matches_found} matches in {files_searched} files)" |
| if not results: |
| return header + "\n No matches found." |
| return header + "\n" + "\n".join(results) |
|
|
|
|
| def tool_file_info(path: str) -> str: |
| """Get file metadata.""" |
| resolved = _resolve_path(path) |
| if resolved is None: |
| return f"Error: Path '{path}' is outside allowed directories." |
|
|
| if not resolved.exists(): |
| return f"Error: File not found: {path}" |
|
|
| stat = resolved.stat() |
| import time |
| mtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.st_mtime)) |
|
|
| info = [ |
| f"File: {path}", |
| f" Size: {stat.st_size:,} bytes ({stat.st_size / 1024:.1f} KB)", |
| f" Modified: {mtime}", |
| f" Type: {'directory' if resolved.is_dir() else resolved.suffix or 'no extension'}", |
| ] |
|
|
| |
| if resolved.is_file() and resolved.suffix.lower() in READABLE_EXTENSIONS: |
| try: |
| lines = resolved.read_text(encoding='utf-8', errors='replace').count('\n') + 1 |
| info.append(f" Lines: {lines:,}") |
| except Exception: |
| pass |
|
|
| return "\n".join(info) |
|
|
|
|
| def tool_run_python(code: str) -> str: |
| """Run a Python snippet safely with timeout.""" |
| import sys |
|
|
| |
| dangerous = ['import os', 'import sys', 'subprocess', 'shutil.rmtree', |
| 'os.remove', 'os.unlink', '__import__', 'eval(', 'exec(', |
| 'open(', 'write(', 'pathlib'] |
| for d in dangerous: |
| if d in code and 'print' not in code.split(d)[0].split('\n')[-1]: |
| |
| if f'"{d}"' not in code and f"'{d}'" not in code: |
| return f"Error: '{d}' is not allowed in run_python for safety. Use read_file/search_code for file operations." |
|
|
| try: |
| result = subprocess.run( |
| [r"J:\python.exe", "-c", code], |
| capture_output=True, |
| text=True, |
| timeout=PYTHON_TIMEOUT, |
| env={**os.environ, "PYTHONPATH": r"J:\Lib\site-packages"}, |
| ) |
|
|
| output = result.stdout |
| if result.stderr: |
| output += "\nSTDERR: " + result.stderr |
|
|
| if not output.strip(): |
| output = "(no output)" |
|
|
| return output.strip() |
|
|
| except subprocess.TimeoutExpired: |
| return f"Error: Code execution timed out after {PYTHON_TIMEOUT}s." |
| except Exception as e: |
| return f"Error running code: {e}" |
|
|
|
|
| def tool_project_summary() -> str: |
| """Generate a quick project structure overview.""" |
| root = ALLOWED_ROOTS[0] |
|
|
| summary = ["Codette Training Lab — Project Structure\n"] |
|
|
| |
| key_dirs = [ |
| ("configs/", "Configuration files (adapter registry, pipeline config)"), |
| ("datasets/", "Training data — perspective-tagged JSONL files"), |
| ("dataset_engine/", "Dataset generation pipeline"), |
| ("evaluation/", "Evaluation scripts and benchmarks"), |
| ("inference/", "Local inference server + web UI"), |
| ("paper/", "Academic paper (LaTeX, PDF, BibTeX)"), |
| ("reasoning_forge/", "Core RC+xi engine, spiderweb, cocoon sync"), |
| ("research/", "Research docs, experiments, DreamReweaver"), |
| ("scripts/", "Training and pipeline scripts"), |
| ("adapters/", "GGUF LoRA adapter files for llama.cpp"), |
| ] |
|
|
| for dirname, desc in key_dirs: |
| dirpath = root / dirname |
| if dirpath.exists(): |
| count = sum(1 for _ in dirpath.rglob("*") if _.is_file()) |
| summary.append(f" [DIR] {dirname:<30s} {desc} ({count} files)") |
|
|
| |
| summary.append("\nKey Files:") |
| key_files = [ |
| "HOWTO.md", "configs/adapter_registry.yaml", |
| "inference/codette_server.py", "inference/codette_orchestrator.py", |
| "reasoning_forge/quantum_spiderweb.py", "reasoning_forge/epistemic_metrics.py", |
| "paper/codette_paper.tex", |
| ] |
| for f in key_files: |
| fp = root / f |
| if fp.exists(): |
| size = fp.stat().st_size |
| summary.append(f" [FILE] {f} ({size / 1024:.1f} KB)") |
|
|
| return "\n".join(summary) |
|
|
|
|
| |
| |
| |
|
|
| TOOL_PROMPT_SUFFIX = """ |
| |
| TOOLS: You can read files, search code, and run calculations. When a user asks about code, files, or the project, you MUST use tools to look things up rather than guessing. |
| |
| Format: <tool>tool_name("arg1", "arg2")</tool> |
| |
| {tool_descriptions} |
| |
| RULES: |
| 1. If the user asks about a file, config, or code: ALWAYS call read_file or search_code FIRST |
| 2. If the user asks "show me" or "what is": call the relevant tool FIRST, then explain |
| 3. For general conversation or reasoning: respond normally without tools |
| 4. Start your response with the tool call on the very first line |
| """ |
|
|
|
|
| def build_tool_system_prompt(base_prompt: str, registry: ToolRegistry) -> str: |
| """Augment a system prompt with tool-use instructions.""" |
| return base_prompt + TOOL_PROMPT_SUFFIX.format( |
| tool_descriptions=registry.get_descriptions() |
| ) |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| print("Testing Codette Tools...\n") |
|
|
| registry = ToolRegistry() |
| print(registry.get_descriptions()) |
|
|
| print("\n--- Test: read_file ---") |
| print(tool_read_file("configs/adapter_registry.yaml", 1, 10)) |
|
|
| print("\n--- Test: list_files ---") |
| print(tool_list_files("inference/")) |
|
|
| print("\n--- Test: search_code ---") |
| print(tool_search_code("phase_coherence", "reasoning_forge/", ".py")) |
|
|
| print("\n--- Test: file_info ---") |
| print(tool_file_info("paper/codette_paper.pdf")) |
|
|
| print("\n--- Test: run_python ---") |
| print(tool_run_python("print(2 ** 10)")) |
|
|
| print("\n--- Test: project_summary ---") |
| print(tool_project_summary()) |
|
|
| print("\n--- Test: parse_tool_calls ---") |
| test = 'Let me check that. <tool>read_file("configs/adapter_registry.yaml", 1, 20)</tool> And also <tool>search_code("AEGIS")</tool>' |
| calls = parse_tool_calls(test) |
| for name, args, kwargs in calls: |
| print(f" Call: {name}({args})") |
|
|
| print("\nDone!") |
|
|