| | import os |
| | import re |
| | import subprocess |
| | import sys |
| | from importlib import util |
| | import types |
| | import tempfile |
| |
|
| | from open_webui.apps.webui.models.functions import Functions |
| | from open_webui.apps.webui.models.tools import Tools |
| | from open_webui.config import FUNCTIONS_DIR, TOOLS_DIR |
| |
|
| |
|
| | def extract_frontmatter(content): |
| | """ |
| | Extract frontmatter as a dictionary from the provided content string. |
| | """ |
| | frontmatter = {} |
| | frontmatter_started = False |
| | frontmatter_ended = False |
| | frontmatter_pattern = re.compile(r"^\s*([a-z_]+):\s*(.*)\s*$", re.IGNORECASE) |
| |
|
| | try: |
| | lines = content.splitlines() |
| | if len(lines) < 1 or lines[0].strip() != '"""': |
| | |
| | return {} |
| |
|
| | frontmatter_started = True |
| |
|
| | for line in lines[1:]: |
| | if '"""' in line: |
| | if frontmatter_started: |
| | frontmatter_ended = True |
| | break |
| |
|
| | if frontmatter_started and not frontmatter_ended: |
| | match = frontmatter_pattern.match(line) |
| | if match: |
| | key, value = match.groups() |
| | frontmatter[key.strip()] = value.strip() |
| |
|
| | except Exception as e: |
| | print(f"An error occurred: {e}") |
| | return {} |
| |
|
| | return frontmatter |
| |
|
| |
|
| | def replace_imports(content): |
| | """ |
| | Replace the import paths in the content. |
| | """ |
| | replacements = { |
| | "from utils": "from open_webui.utils", |
| | "from apps": "from open_webui.apps", |
| | "from main": "from open_webui.main", |
| | "from config": "from open_webui.config", |
| | } |
| |
|
| | for old, new in replacements.items(): |
| | content = content.replace(old, new) |
| |
|
| | return content |
| |
|
| |
|
| | def load_toolkit_module_by_id(toolkit_id, content=None): |
| |
|
| | if content is None: |
| | tool = Tools.get_tool_by_id(toolkit_id) |
| | if not tool: |
| | raise Exception(f"Toolkit not found: {toolkit_id}") |
| |
|
| | content = tool.content |
| |
|
| | content = replace_imports(content) |
| | Tools.update_tool_by_id(toolkit_id, {"content": content}) |
| | else: |
| | frontmatter = extract_frontmatter(content) |
| | |
| | install_frontmatter_requirements(frontmatter.get("requirements", "")) |
| |
|
| | module_name = f"tool_{toolkit_id}" |
| | module = types.ModuleType(module_name) |
| | sys.modules[module_name] = module |
| |
|
| | |
| | |
| | temp_file = tempfile.NamedTemporaryFile(delete=False) |
| | temp_file.close() |
| | try: |
| | with open(temp_file.name, "w", encoding="utf-8") as f: |
| | f.write(content) |
| | module.__dict__["__file__"] = temp_file.name |
| |
|
| | |
| | exec(content, module.__dict__) |
| | frontmatter = extract_frontmatter(content) |
| | print(f"Loaded module: {module.__name__}") |
| |
|
| | |
| | if hasattr(module, "Tools"): |
| | return module.Tools(), frontmatter |
| | else: |
| | raise Exception("No Tools class found in the module") |
| | except Exception as e: |
| | print(f"Error loading module: {toolkit_id}: {e}") |
| | del sys.modules[module_name] |
| | raise e |
| | finally: |
| | os.unlink(temp_file.name) |
| |
|
| |
|
| | def load_function_module_by_id(function_id, content=None): |
| | if content is None: |
| | function = Functions.get_function_by_id(function_id) |
| | if not function: |
| | raise Exception(f"Function not found: {function_id}") |
| | content = function.content |
| |
|
| | content = replace_imports(content) |
| | Functions.update_function_by_id(function_id, {"content": content}) |
| | else: |
| | frontmatter = extract_frontmatter(content) |
| | install_frontmatter_requirements(frontmatter.get("requirements", "")) |
| |
|
| | module_name = f"function_{function_id}" |
| | module = types.ModuleType(module_name) |
| | sys.modules[module_name] = module |
| |
|
| | |
| | |
| | temp_file = tempfile.NamedTemporaryFile(delete=False) |
| | temp_file.close() |
| | try: |
| | with open(temp_file.name, "w", encoding="utf-8") as f: |
| | f.write(content) |
| | module.__dict__["__file__"] = temp_file.name |
| |
|
| | |
| | exec(content, module.__dict__) |
| | frontmatter = extract_frontmatter(content) |
| | print(f"Loaded module: {module.__name__}") |
| |
|
| | |
| | if hasattr(module, "Pipe"): |
| | return module.Pipe(), "pipe", frontmatter |
| | elif hasattr(module, "Filter"): |
| | return module.Filter(), "filter", frontmatter |
| | elif hasattr(module, "Action"): |
| | return module.Action(), "action", frontmatter |
| | else: |
| | raise Exception("No Function class found in the module") |
| | except Exception as e: |
| | print(f"Error loading module: {function_id}: {e}") |
| | del sys.modules[module_name] |
| |
|
| | Functions.update_function_by_id(function_id, {"is_active": False}) |
| | raise e |
| | finally: |
| | os.unlink(temp_file.name) |
| |
|
| |
|
| | def install_frontmatter_requirements(requirements): |
| | if requirements: |
| | req_list = [req.strip() for req in requirements.split(",")] |
| | for req in req_list: |
| | print(f"Installing requirement: {req}") |
| | subprocess.check_call([sys.executable, "-m", "pip", "install", req]) |
| | else: |
| | print("No requirements found in frontmatter.") |
| |
|