Spaces:
Running
Running
| """ | |
| Core shared utilities for the Nymbo-Tools MCP server. | |
| Consolidates three key areas: | |
| 1. Sandboxed filesystem operations (path resolution, reading, writing, safe_open) | |
| 2. Sandboxed Python execution (code interpreter, agent terminal) | |
| 3. Hugging Face inference utilities (token, providers, error handling) | |
| """ | |
| from __future__ import annotations | |
| import ast | |
| import json | |
| import os | |
| import re | |
| import stat | |
| import sys | |
| from datetime import datetime | |
| from io import StringIO | |
| from typing import Any, Callable, Optional, TypeVar | |
| import gradio as gr | |
| # =========================================================================== | |
| # Part 0: Tree Rendering Utilities | |
| # =========================================================================== | |
| def _fmt_size(num_bytes: int) -> str: | |
| """Format byte size as human-readable string.""" | |
| units = ["B", "KB", "MB", "GB"] | |
| size = float(num_bytes) | |
| for unit in units: | |
| if size < 1024.0: | |
| return f"{size:.1f} {unit}" | |
| size /= 1024.0 | |
| return f"{size:.1f} TB" | |
| def build_tree(entries: list[tuple[str, dict]]) -> dict: | |
| """ | |
| Build a nested tree structure from flat path entries. | |
| Args: | |
| entries: List of (path, metadata) tuples where path uses forward slashes. | |
| Paths ending with '/' are treated as directories. | |
| Returns: | |
| Nested dict with "__files__" key for files at each level. | |
| """ | |
| root: dict = {"__files__": []} | |
| for path, metadata in entries: | |
| parts = path.rstrip("/").split("/") | |
| is_dir = path.endswith("/") | |
| node = root | |
| for i, part in enumerate(parts[:-1]): | |
| if part not in node: | |
| node[part] = {"__files__": []} | |
| node = node[part] | |
| final = parts[-1] | |
| if is_dir: | |
| if final not in node: | |
| node[final] = {"__files__": []} | |
| if metadata: | |
| node[final]["__meta__"] = metadata | |
| else: | |
| node["__files__"].append((final, metadata)) | |
| return root | |
| def render_tree( | |
| node: dict, | |
| prefix: str = "", | |
| format_entry: Optional[Callable[[str, dict, bool], str]] = None, | |
| ) -> list[str]: | |
| """ | |
| Render a tree with line connectors. | |
| Args: | |
| node: Nested dict from build_tree() | |
| prefix: Current line prefix for indentation | |
| format_entry: Optional callback to format each entry. | |
| Returns: | |
| List of formatted lines. | |
| """ | |
| result = [] | |
| def default_format(name: str, meta: dict, is_dir: bool) -> str: | |
| if is_dir: | |
| return f"{name}/" | |
| size = meta.get("size") | |
| if size is not None: | |
| return f"{name} ({_fmt_size(size)})" | |
| return name | |
| fmt = format_entry or default_format | |
| entries = [] | |
| subdirs = sorted(k for k in node.keys() if k not in ("__files__", "__meta__")) | |
| files_here = sorted(node.get("__files__", []), key=lambda x: x[0]) | |
| for dirname in subdirs: | |
| dir_meta = node[dirname].get("__meta__", {}) | |
| entries.append(("dir", dirname, node[dirname], dir_meta)) | |
| for fname, fmeta in files_here: | |
| entries.append(("file", fname, None, fmeta)) | |
| for i, entry in enumerate(entries): | |
| is_last = (i == len(entries) - 1) | |
| connector = "└── " if is_last else "├── " | |
| child_prefix = prefix + (" " if is_last else "│ ") | |
| etype, name, subtree, meta = entry | |
| if etype == "dir": | |
| result.append(f"{prefix}{connector}{fmt(name, meta, True)}") | |
| result.extend(render_tree(subtree, child_prefix, format_entry)) | |
| else: | |
| result.append(f"{prefix}{connector}{fmt(name, meta, False)}") | |
| return result | |
| def walk_and_build_tree( | |
| abs_path: str, | |
| *, | |
| show_hidden: bool = False, | |
| recursive: bool = False, | |
| max_entries: int = 100, | |
| ) -> tuple[dict, int, bool]: | |
| """ | |
| Walk a directory and build a tree structure. | |
| Returns: | |
| (tree, total_entries, truncated) | |
| """ | |
| entries: list[tuple[str, dict]] = [] | |
| total = 0 | |
| truncated = False | |
| for root, dirs, files in os.walk(abs_path): | |
| if not show_hidden: | |
| dirs[:] = [d for d in dirs if not d.startswith('.')] | |
| files = [f for f in files if not f.startswith('.')] | |
| dirs.sort() | |
| files.sort() | |
| try: | |
| rel_root = os.path.relpath(root, abs_path) | |
| except Exception: | |
| rel_root = "" | |
| prefix = "" if rel_root == "." else rel_root.replace("\\", "/") + "/" | |
| for d in dirs: | |
| p = os.path.join(root, d) | |
| try: | |
| mtime = datetime.fromtimestamp(os.path.getmtime(p)).strftime("%Y-%m-%d %H:%M") | |
| except Exception: | |
| mtime = "?" | |
| entries.append((f"{prefix}{d}/", {"mtime": mtime})) | |
| total += 1 | |
| if total >= max_entries: | |
| truncated = True | |
| break | |
| if truncated: | |
| break | |
| for f in files: | |
| p = os.path.join(root, f) | |
| try: | |
| size = os.path.getsize(p) | |
| mtime = datetime.fromtimestamp(os.path.getmtime(p)).strftime("%Y-%m-%d %H:%M") | |
| except Exception: | |
| size, mtime = 0, "?" | |
| entries.append((f"{prefix}{f}", {"size": size, "mtime": mtime})) | |
| total += 1 | |
| if total >= max_entries: | |
| truncated = True | |
| break | |
| if truncated: | |
| break | |
| if not recursive: | |
| break | |
| return build_tree(entries), total, truncated | |
| def format_dir_listing( | |
| abs_path: str, | |
| display_path: str, | |
| *, | |
| show_hidden: bool = False, | |
| recursive: bool = False, | |
| max_entries: int = 100, | |
| fmt_size_fn: Optional[Callable[[int], str]] = None, | |
| ) -> str: | |
| """Format a directory listing as a visual tree.""" | |
| fmt_size = fmt_size_fn or _fmt_size | |
| tree, total, truncated = walk_and_build_tree( | |
| abs_path, | |
| show_hidden=show_hidden, | |
| recursive=recursive, | |
| max_entries=max_entries, | |
| ) | |
| def format_entry(name: str, meta: dict, is_dir: bool) -> str: | |
| mtime = meta.get("mtime", "") | |
| if is_dir: | |
| return f"{name}/ ({mtime})" | |
| size = meta.get("size", 0) | |
| return f"{name} ({fmt_size(size)}, {mtime})" | |
| tree_lines = render_tree(tree, " ", format_entry) | |
| header = f"Listing of {display_path}\nRoot: /\nEntries: {total}" | |
| if truncated: | |
| header += f"\n… Truncated at {max_entries} entries." | |
| lines = [header, "", "└── /"] | |
| lines.extend(tree_lines) | |
| return "\n".join(lines).strip() | |
| # =========================================================================== | |
| # Part 1: Sandboxed Filesystem Operations | |
| # =========================================================================== | |
| class SandboxedRoot: | |
| """ | |
| A configurable sandboxed root directory with path resolution and safety checks. | |
| Args: | |
| root_dir: Absolute path to the sandbox root. | |
| allow_abs: If True, allow absolute paths outside the sandbox. | |
| """ | |
| def __init__(self, root_dir: str, allow_abs: bool = False): | |
| self.root_dir = os.path.abspath(root_dir) | |
| self.allow_abs = allow_abs | |
| # Ensure root exists | |
| try: | |
| os.makedirs(self.root_dir, exist_ok=True) | |
| except Exception: | |
| pass | |
| def safe_err(self, exc: Exception | str) -> str: | |
| """Return an error string with any absolute root replaced by '/' and slashes normalized.""" | |
| s = str(exc) | |
| s_norm = s.replace("\\", "/") | |
| root_fwd = self.root_dir.replace("\\", "/") | |
| root_variants = {self.root_dir, root_fwd, re.sub(r"/+", "/", root_fwd)} | |
| for variant in root_variants: | |
| if variant: | |
| s_norm = s_norm.replace(variant, "/") | |
| s_norm = re.sub(r"/+", "/", s_norm) | |
| return s_norm | |
| def err( | |
| self, | |
| code: str, | |
| message: str, | |
| *, | |
| path: Optional[str] = None, | |
| hint: Optional[str] = None, | |
| data: Optional[dict] = None, | |
| ) -> str: | |
| """Return a structured error JSON string.""" | |
| payload = { | |
| "status": "error", | |
| "code": code, | |
| "message": message, | |
| "root": "/", | |
| } | |
| if path is not None and path != "": | |
| payload["path"] = path | |
| if hint: | |
| payload["hint"] = hint | |
| if data: | |
| payload["data"] = data | |
| return json.dumps(payload, ensure_ascii=False) | |
| def display_path(self, abs_path: str) -> str: | |
| """Return a user-friendly path relative to root using forward slashes.""" | |
| try: | |
| norm_root = os.path.normpath(self.root_dir) | |
| norm_abs = os.path.normpath(abs_path) | |
| common = os.path.commonpath([norm_root, norm_abs]) | |
| if os.path.normcase(common) == os.path.normcase(norm_root): | |
| rel = os.path.relpath(norm_abs, norm_root) | |
| if rel == ".": | |
| return "/" | |
| return "/" + rel.replace("\\", "/") | |
| except Exception: | |
| pass | |
| return abs_path.replace("\\", "/") | |
| def resolve_path(self, path: str) -> tuple[str, str]: | |
| """ | |
| Resolve a user-provided path to an absolute, normalized path constrained to root. | |
| Returns (abs_path, error_message). error_message is empty when ok. | |
| """ | |
| try: | |
| user_input = (path or "/").strip() or "/" | |
| if user_input.startswith("/"): | |
| rel_part = user_input.lstrip("/") or "." | |
| raw = os.path.expanduser(rel_part) | |
| treat_as_relative = True | |
| else: | |
| raw = os.path.expanduser(user_input) | |
| treat_as_relative = False | |
| if not treat_as_relative and os.path.isabs(raw): | |
| if not self.allow_abs: | |
| return "", self.err( | |
| "absolute_path_disabled", | |
| "Absolute paths are disabled in safe mode.", | |
| path=raw.replace("\\", "/"), | |
| hint="Use a path relative to / (e.g., /notes/todo.txt).", | |
| ) | |
| abs_path = os.path.abspath(raw) | |
| else: | |
| abs_path = os.path.abspath(os.path.join(self.root_dir, raw)) | |
| # Constrain to root when not allowing absolute paths | |
| if not self.allow_abs: | |
| try: | |
| common = os.path.commonpath( | |
| [os.path.normpath(self.root_dir), os.path.normpath(abs_path)] | |
| ) | |
| if common != os.path.normpath(self.root_dir): | |
| return "", self.err( | |
| "path_outside_root", | |
| "Path is outside the sandbox root.", | |
| path=abs_path, | |
| ) | |
| except Exception: | |
| return "", self.err( | |
| "path_outside_root", | |
| "Path is outside the sandbox root.", | |
| path=abs_path, | |
| ) | |
| return abs_path, "" | |
| except Exception as exc: | |
| return "", self.err( | |
| "resolve_path_failed", | |
| "Failed to resolve path.", | |
| path=(path or ""), | |
| data={"error": self.safe_err(exc)}, | |
| ) | |
| def safe_open(self, file, *args, **kwargs): | |
| """A drop-in replacement for open() that enforces sandbox constraints.""" | |
| if isinstance(file, int): | |
| return open(file, *args, **kwargs) | |
| path_str = os.fspath(file) | |
| abs_path, err = self.resolve_path(path_str) | |
| if err: | |
| try: | |
| msg = json.loads(err)["message"] | |
| except Exception: | |
| msg = err | |
| raise PermissionError(f"Sandboxed open() failed: {msg}") | |
| return open(abs_path, *args, **kwargs) | |
| def list_dir( | |
| self, | |
| abs_path: str, | |
| *, | |
| show_hidden: bool = False, | |
| recursive: bool = False, | |
| max_entries: int = 100, | |
| ) -> str: | |
| """List directory contents as a visual tree.""" | |
| return format_dir_listing( | |
| abs_path, | |
| self.display_path(abs_path), | |
| show_hidden=show_hidden, | |
| recursive=recursive, | |
| max_entries=max_entries, | |
| fmt_size_fn=_fmt_size, | |
| ) | |
| def search_text( | |
| self, | |
| abs_path: str, | |
| query: str, | |
| *, | |
| recursive: bool = False, | |
| show_hidden: bool = False, | |
| max_results: int = 20, | |
| case_sensitive: bool = False, | |
| start_index: int = 0, | |
| ) -> str: | |
| """Search for text within files.""" | |
| if not os.path.exists(abs_path): | |
| return self.err( | |
| "path_not_found", | |
| f"Path not found: {self.display_path(abs_path)}", | |
| path=self.display_path(abs_path), | |
| ) | |
| query = query or "" | |
| normalized_query = query if case_sensitive else query.lower() | |
| if normalized_query == "": | |
| return self.err( | |
| "missing_search_query", | |
| "Search query is required for the search action.", | |
| hint="Provide text in the Content field to search for.", | |
| ) | |
| max_results = max(1, int(max_results) if max_results is not None else 20) | |
| start_index = max(0, int(start_index) if start_index is not None else 0) | |
| matches: list[tuple[str, int, str]] = [] | |
| errors: list[str] = [] | |
| files_scanned = 0 | |
| truncated = False | |
| total_matches = 0 | |
| def _should_skip(name: str) -> bool: | |
| return not show_hidden and name.startswith(".") | |
| def _handle_match(file_path: str, line_no: int, line_text: str) -> bool: | |
| nonlocal truncated, total_matches | |
| total_matches += 1 | |
| if total_matches <= start_index: | |
| return False | |
| if len(matches) < max_results: | |
| snippet = line_text.strip() | |
| if len(snippet) > 200: | |
| snippet = snippet[:197] + "…" | |
| matches.append((self.display_path(file_path), line_no, snippet)) | |
| return False | |
| truncated = True | |
| return True | |
| def _search_file(file_path: str) -> bool: | |
| nonlocal files_scanned | |
| files_scanned += 1 | |
| try: | |
| with open(file_path, "r", encoding="utf-8", errors="replace") as handle: | |
| for line_no, line in enumerate(handle, start=1): | |
| haystack = line if case_sensitive else line.lower() | |
| if normalized_query in haystack: | |
| if _handle_match(file_path, line_no, line): | |
| return True | |
| except Exception as exc: | |
| errors.append(f"{self.display_path(file_path)} ({self.safe_err(exc)})") | |
| return truncated | |
| if os.path.isfile(abs_path): | |
| _search_file(abs_path) | |
| else: | |
| for root, dirs, files in os.walk(abs_path): | |
| dirs[:] = [d for d in dirs if not _should_skip(d)] | |
| visible_files = [f for f in files if show_hidden or not f.startswith(".")] | |
| for name in visible_files: | |
| file_path = os.path.join(root, name) | |
| if _search_file(file_path): | |
| break | |
| if truncated: | |
| break | |
| if not recursive: | |
| break | |
| header_lines = [ | |
| f"Search results for {query!r}", | |
| f"Scope: {self.display_path(abs_path)}", | |
| f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}", | |
| f"Start offset: {start_index}", | |
| f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""), | |
| f"Files scanned: {files_scanned}", | |
| ] | |
| next_cursor = start_index + len(matches) if truncated else None | |
| if truncated: | |
| header_lines.append(f"Matches encountered before truncation: {total_matches}") | |
| header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.") | |
| header_lines.append(f"Next cursor: {next_cursor}") | |
| else: | |
| header_lines.append(f"Total matches found: {total_matches}") | |
| header_lines.append("Truncated: no — end of results.") | |
| header_lines.append("Next cursor: None") | |
| if not matches: | |
| if total_matches > 0 and start_index >= total_matches: | |
| hint_limit = max(total_matches - 1, 0) | |
| body_lines = [ | |
| f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.", | |
| (f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""), | |
| ] | |
| body_lines = [line for line in body_lines if line] | |
| else: | |
| body_lines = [ | |
| "No matches found.", | |
| (f"Total matches encountered: {total_matches}." if total_matches else ""), | |
| ] | |
| body_lines = [line for line in body_lines if line] | |
| else: | |
| body_lines = [ | |
| f"{idx}. {path}:{line_no}: {text}" | |
| for idx, (path, line_no, text) in enumerate(matches, start=1) | |
| ] | |
| if errors: | |
| shown = errors[:5] | |
| body_lines.extend(["", "Warnings:"]) | |
| body_lines.extend(shown) | |
| if len(errors) > len(shown): | |
| body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.") | |
| return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines) | |
| def read_file(self, abs_path: str, *, offset: int = 0, max_chars: int = 4000) -> str: | |
| """Read file contents with optional offset and character limit.""" | |
| if not os.path.exists(abs_path): | |
| return self.err( | |
| "file_not_found", | |
| f"File not found: {self.display_path(abs_path)}", | |
| path=self.display_path(abs_path), | |
| ) | |
| if os.path.isdir(abs_path): | |
| return self.err( | |
| "is_directory", | |
| f"Path is a directory, not a file: {self.display_path(abs_path)}", | |
| path=self.display_path(abs_path), | |
| hint="Provide a file path.", | |
| ) | |
| try: | |
| with open(abs_path, "r", encoding="utf-8", errors="replace") as f: | |
| data = f.read() | |
| except Exception as exc: | |
| return self.err( | |
| "read_failed", | |
| "Failed to read file.", | |
| path=self.display_path(abs_path), | |
| data={"error": self.safe_err(exc)}, | |
| ) | |
| total = len(data) | |
| start = max(0, min(offset, total)) | |
| if max_chars > 0: | |
| end = min(total, start + max_chars) | |
| else: | |
| end = total | |
| chunk = data[start:end] | |
| next_cursor = end if end < total else None | |
| header = ( | |
| f"Reading {self.display_path(abs_path)}\n" | |
| f"Offset {start}, returned {len(chunk)} of {total}." | |
| + (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "") | |
| ) | |
| sep = "\n\n---\n\n" | |
| return header + sep + chunk | |
| def info(self, abs_path: str) -> str: | |
| """Get file/directory metadata as JSON.""" | |
| try: | |
| st = os.stat(abs_path) | |
| except Exception as exc: | |
| return self.err( | |
| "stat_failed", | |
| "Failed to stat path.", | |
| path=self.display_path(abs_path), | |
| data={"error": self.safe_err(exc)}, | |
| ) | |
| info_dict = { | |
| "path": self.display_path(abs_path), | |
| "type": "directory" if stat.S_ISDIR(st.st_mode) else "file", | |
| "size": st.st_size, | |
| "modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=" ", timespec="seconds"), | |
| "created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=" ", timespec="seconds"), | |
| "mode": oct(st.st_mode), | |
| "root": "/", | |
| } | |
| return json.dumps(info_dict, indent=2) | |
| # --------------------------------------------------------------------------- | |
| # Default roots (can be overridden by environment variables) | |
| # --------------------------------------------------------------------------- | |
| def _get_filesystem_root() -> str: | |
| """Get the default filesystem root directory.""" | |
| root = os.getenv("NYMBO_TOOLS_ROOT") | |
| if root and root.strip(): | |
| return os.path.abspath(os.path.expanduser(root.strip())) | |
| try: | |
| here = os.path.abspath(__file__) | |
| tools_dir = os.path.dirname(os.path.dirname(here)) | |
| return os.path.abspath(os.path.join(tools_dir, "Filesystem")) | |
| except Exception: | |
| return os.path.abspath(os.getcwd()) | |
| def _get_obsidian_root() -> str: | |
| """Get the default Obsidian vault root directory.""" | |
| env_root = os.getenv("OBSIDIAN_VAULT_ROOT") | |
| if env_root and env_root.strip(): | |
| return os.path.abspath(os.path.expanduser(env_root.strip())) | |
| try: | |
| here = os.path.abspath(__file__) | |
| tools_dir = os.path.dirname(os.path.dirname(here)) | |
| return os.path.abspath(os.path.join(tools_dir, "Obsidian")) | |
| except Exception: | |
| return os.path.abspath(os.getcwd()) | |
| # Pre-configured sandbox instances | |
| ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0"))) | |
| FILESYSTEM_ROOT = _get_filesystem_root() | |
| OBSIDIAN_ROOT = _get_obsidian_root() | |
| # Default sandbox for /Filesystem (used by most tools) | |
| filesystem_sandbox = SandboxedRoot(FILESYSTEM_ROOT, allow_abs=ALLOW_ABS) | |
| # Sandbox for /Obsidian vault | |
| obsidian_sandbox = SandboxedRoot(OBSIDIAN_ROOT, allow_abs=ALLOW_ABS) | |
| # Convenience exports (for backward compatibility) | |
| ROOT_DIR = FILESYSTEM_ROOT | |
| def _resolve_path(path: str) -> tuple[str, str]: | |
| """Resolve path using the default filesystem sandbox.""" | |
| return filesystem_sandbox.resolve_path(path) | |
| def _display_path(abs_path: str) -> str: | |
| """Display path using the default filesystem sandbox.""" | |
| return filesystem_sandbox.display_path(abs_path) | |
| def safe_open(file, *args, **kwargs): | |
| """Open file using the default filesystem sandbox.""" | |
| return filesystem_sandbox.safe_open(file, *args, **kwargs) | |
| # =========================================================================== | |
| # Part 2: Sandboxed Python Execution | |
| # =========================================================================== | |
| def create_safe_builtins() -> dict: | |
| """Create a builtins dict with sandboxed open().""" | |
| if isinstance(__builtins__, dict): | |
| safe_builtins = __builtins__.copy() | |
| else: | |
| safe_builtins = vars(__builtins__).copy() | |
| safe_builtins["open"] = safe_open | |
| return safe_builtins | |
| def sandboxed_exec( | |
| code: str, | |
| *, | |
| extra_globals: dict[str, Any] | None = None, | |
| ast_mode: bool = False, | |
| ) -> str: | |
| """ | |
| Execute Python code in a sandboxed environment. | |
| Args: | |
| code: Python source code to execute | |
| extra_globals: Additional globals to inject (e.g., tools) | |
| ast_mode: If True, parse and print results of all expression statements | |
| (like Agent_Terminal). If False, simple exec (like Code_Interpreter). | |
| Returns: | |
| Captured stdout output, or exception text on error. | |
| """ | |
| if not code: | |
| return "No code provided." | |
| old_stdout = sys.stdout | |
| old_cwd = os.getcwd() | |
| redirected_output = sys.stdout = StringIO() | |
| # Build execution environment | |
| safe_builtins = create_safe_builtins() | |
| env: dict[str, Any] = { | |
| "open": safe_open, | |
| "__builtins__": safe_builtins, | |
| "print": print, | |
| } | |
| if extra_globals: | |
| env.update(extra_globals) | |
| try: | |
| os.chdir(ROOT_DIR) | |
| if ast_mode: | |
| # Parse and evaluate each statement, printing expression results | |
| tree = ast.parse(code) | |
| for node in tree.body: | |
| if isinstance(node, ast.Expr): | |
| # Standalone expression - evaluate and print result | |
| expr = compile(ast.Expression(node.value), filename="<string>", mode="eval") | |
| result_val = eval(expr, env) | |
| if result_val is not None: | |
| print(result_val) | |
| else: | |
| # Statement - execute it | |
| mod = ast.Module(body=[node], type_ignores=[]) | |
| exec(compile(mod, filename="<string>", mode="exec"), env) | |
| else: | |
| # Simple exec mode | |
| exec(code, env) | |
| result = redirected_output.getvalue() | |
| except Exception as exc: | |
| result = str(exc) | |
| finally: | |
| sys.stdout = old_stdout | |
| try: | |
| os.chdir(old_cwd) | |
| except Exception: | |
| pass | |
| return result | |
| # =========================================================================== | |
| # Part 3: Hugging Face Inference Utilities | |
| # =========================================================================== | |
| def get_hf_token() -> str | None: | |
| """Get the HF API token from environment variables. | |
| Checks HF_READ_TOKEN first, then falls back to HF_TOKEN. | |
| """ | |
| return os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN") | |
| # Pre-instantiated token for modules that prefer this pattern | |
| HF_TOKEN = get_hf_token() | |
| # Standard provider list for image/video generation | |
| DEFAULT_PROVIDERS = ["auto", "replicate", "fal-ai"] | |
| # Provider list for text generation (Deep Research) | |
| TEXTGEN_PROVIDERS = ["cerebras", "auto"] | |
| T = TypeVar("T") | |
| def handle_hf_error(msg: str, model_id: str, *, context: str = "generation") -> None: | |
| """ | |
| Raise appropriate gr.Error for common HF API error codes. | |
| Args: | |
| msg: Error message string to analyze | |
| model_id: The model ID being used (for error messages) | |
| context: Description of operation for error messages | |
| Raises: | |
| gr.Error: With user-friendly message based on error type | |
| """ | |
| lowered = msg.lower() | |
| if "404" in msg: | |
| raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and your HF token access.") | |
| if "503" in msg: | |
| raise gr.Error("The model is warming up. Please try again shortly.") | |
| if "401" in msg or "403" in msg: | |
| raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.") | |
| if any(pattern in lowered for pattern in ("api_key", "hf auth login", "unauthorized", "forbidden")): | |
| raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.") | |
| # If none of the known patterns match, raise generic error | |
| raise gr.Error(f"{context.capitalize()} failed: {msg}") | |
| def invoke_with_fallback( | |
| fn: Callable[[str], T], | |
| providers: list[str] | None = None, | |
| ) -> T: | |
| """ | |
| Try calling fn(provider) for each provider until one succeeds. | |
| Args: | |
| fn: Function that takes a provider string and returns a result. | |
| Should raise an exception on failure. | |
| providers: List of provider strings to try. Defaults to DEFAULT_PROVIDERS. | |
| Returns: | |
| The result from the first successful fn() call. | |
| Raises: | |
| The last exception if all providers fail. | |
| """ | |
| if providers is None: | |
| providers = DEFAULT_PROVIDERS | |
| last_error: Exception | None = None | |
| for provider in providers: | |
| try: | |
| return fn(provider) | |
| except Exception as exc: | |
| last_error = exc | |
| continue | |
| # All providers failed | |
| if last_error: | |
| raise last_error | |
| raise RuntimeError("No providers available") | |
| # =========================================================================== | |
| # Public API | |
| # =========================================================================== | |
| __all__ = [ | |
| # Tree Utils | |
| "_fmt_size", | |
| "build_tree", | |
| "render_tree", | |
| "walk_and_build_tree", | |
| "format_dir_listing", | |
| # Filesystem | |
| "SandboxedRoot", | |
| "filesystem_sandbox", | |
| "obsidian_sandbox", | |
| "ROOT_DIR", | |
| "FILESYSTEM_ROOT", | |
| "OBSIDIAN_ROOT", | |
| "ALLOW_ABS", | |
| "_resolve_path", | |
| "_display_path", | |
| "safe_open", | |
| # Execution | |
| "sandboxed_exec", | |
| "create_safe_builtins", | |
| # HF Inference | |
| "get_hf_token", | |
| "HF_TOKEN", | |
| "DEFAULT_PROVIDERS", | |
| "TEXTGEN_PROVIDERS", | |
| "handle_hf_error", | |
| "invoke_with_fallback", | |
| ] | |