| from __future__ import annotations |
|
|
| |
|
|
| import json |
| import os |
| import sys |
| import traceback |
| from pathlib import Path |
|
|
| from starlette.middleware import Middleware |
| from starlette.middleware.cors import CORSMiddleware |
| from starlette.responses import PlainTextResponse |
|
|
|
|
| def _discover_workspace_root() -> Path: |
| env_root = os.getenv("CODE_TOOLS_ROOT") |
| if env_root: |
| return Path(env_root).expanduser().resolve() |
|
|
| script_root = Path(__file__).resolve().parents[1] |
| if (script_root / ".prefab").exists(): |
| return script_root |
|
|
| return (Path.home() / "source/code_tools").resolve() |
|
|
|
|
| WORKSPACE_ROOT = _discover_workspace_root() |
| PREFAB_ROOT = WORKSPACE_ROOT / ".prefab" |
| PREFAB_SRC = Path(os.getenv("PREFAB_SRC", str(Path.home() / "source/prefab/src"))) |
| SCRIPTS_DIR = Path(__file__).resolve().parent |
| CARDS_DIR = PREFAB_ROOT / "agent-cards" |
| CONFIG_PATH = PREFAB_ROOT / "fastagent.config.yaml" |
| RAW_CARD_FILE = CARDS_DIR / "hub_search_raw.md" |
| EXPANDED_CARDS_DIR = CARDS_DIR |
| RAW_AGENT = "hub_search_raw" |
|
|
| HOST = os.getenv("HOST", "0.0.0.0") |
| PORT = int(os.getenv("PORT", "9999")) |
| PATH = os.getenv("MCP_PATH", "/mcp") |
| CORS_MIDDLEWARE = [ |
| Middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| expose_headers=["mcp-session-id"], |
| ) |
| ] |
|
|
| os.chdir(WORKSPACE_ROOT) |
| if PREFAB_SRC.exists() and str(PREFAB_SRC) not in sys.path: |
| sys.path.insert(0, str(PREFAB_SRC)) |
| if str(SCRIPTS_DIR) not in sys.path: |
| sys.path.insert(0, str(SCRIPTS_DIR)) |
|
|
| from fast_agent import FastAgent |
| from fast_agent.mcp.auth.context import request_bearer_token |
| from fast_agent.mcp.auth.middleware import HFAuthHeaderMiddleware |
| from fast_agent.mcp.auth.presence import PresenceTokenVerifier |
| from fastmcp import FastMCP |
| from fastmcp.server.auth.auth import RemoteAuthProvider |
| from fastmcp.server.dependencies import get_access_token |
| from fastmcp.tools import ToolResult |
| from mcp.types import TextContent |
| from pydantic import AnyHttpUrl |
| from card_includes import materialize_expanded_card |
| from prefab_hub_ui import build_runtime_wire, error_wire, parse_runtime_payload |
|
|
|
|
| class _RootResourceRemoteAuthProvider(RemoteAuthProvider): |
| """Advertise the Space root as the protected resource.""" |
|
|
| def _get_resource_url(self, path: str | None = None) -> AnyHttpUrl | None: |
| del path |
| return self.base_url |
|
|
|
|
|
|
| def _get_oauth_config() -> tuple[str | None, list[str], str]: |
| oauth_provider = os.environ.get("FAST_AGENT_SERVE_OAUTH", "").lower() |
| if oauth_provider in ("hf", "huggingface"): |
| oauth_provider = "huggingface" |
| elif not oauth_provider: |
| oauth_provider = None |
|
|
| oauth_scopes_str = os.environ.get("FAST_AGENT_OAUTH_SCOPES", "") |
| oauth_scopes = [s.strip() for s in oauth_scopes_str.split(",") if s.strip()] or ["access"] |
| resource_url = os.environ.get( |
| "FAST_AGENT_OAUTH_RESOURCE_URL", |
| f"http://localhost:{PORT}", |
| ) |
| return oauth_provider, oauth_scopes, resource_url |
|
|
|
|
| EXPANDED_RAW_CARD_FILE = materialize_expanded_card( |
| RAW_CARD_FILE, |
| workspace_root=WORKSPACE_ROOT, |
| out_dir=EXPANDED_CARDS_DIR, |
| ) |
|
|
| fast = FastAgent( |
| "hub-search-prefab", |
| config_path=str(CONFIG_PATH), |
| parse_cli_args=False, |
| ) |
| fast.load_agents(EXPANDED_RAW_CARD_FILE) |
|
|
| _oauth_provider, _oauth_scopes, _oauth_resource_url = _get_oauth_config() |
| _auth_provider = None |
| _middleware = list(CORS_MIDDLEWARE) |
|
|
| if _oauth_provider == "huggingface": |
| _auth_provider = _RootResourceRemoteAuthProvider( |
| token_verifier=PresenceTokenVerifier( |
| provider="huggingface", |
| scopes=_oauth_scopes, |
| ), |
| authorization_servers=[AnyHttpUrl("https://huggingface.co")], |
| base_url=AnyHttpUrl(_oauth_resource_url), |
| scopes_supported=_oauth_scopes, |
| resource_name="gen-ui", |
| resource_documentation=AnyHttpUrl("https://huggingface.co/spaces/evalstate/gen-ui"), |
| ) |
| _middleware.append(Middleware(HFAuthHeaderMiddleware)) |
|
|
| mcp = FastMCP( |
| "hub-search-prefab", |
| auth=_auth_provider, |
| ) |
|
|
|
|
| @mcp.custom_route("/", methods=["GET"]) |
| async def root_info(request) -> PlainTextResponse: |
| return PlainTextResponse("gen-ui MCP server. Use /mcp for MCP and /.well-known/oauth-protected-resource for auth discovery.") |
|
|
|
|
| async def _run_raw(query: str) -> str: |
| return await _run_agent(RAW_AGENT, query) |
|
|
|
|
|
|
| def _get_request_bearer_token() -> str | None: |
| access_token = get_access_token() |
| if access_token is None: |
| return None |
| return access_token.token |
|
|
|
|
| async def _run_agent(agent_name: str, query: str) -> str: |
| saved_token = request_bearer_token.set(_get_request_bearer_token()) |
| try: |
| async with fast.run() as agents: |
| return await getattr(agents, agent_name).send(query) |
| finally: |
| request_bearer_token.reset(saved_token) |
|
|
|
|
|
|
| def _wire_tool_result(wire: dict[str, object]) -> ToolResult: |
| return ToolResult( |
| content=[TextContent(type="text", text="[Rendered Prefab UI]")], |
| structured_content=wire, |
| ) |
|
|
|
|
|
|
| def _render_query_wire(query: str, raw_text: str) -> dict[str, object]: |
| payload = parse_runtime_payload(raw_text) |
| return build_runtime_wire(query, payload) |
|
|
|
|
| async def _build_query_wire(query: str) -> dict[str, object]: |
| raw = await _run_raw(query) |
| return _render_query_wire(query, raw) |
|
|
|
|
|
|
| def _missing_query_json() -> str: |
| return json.dumps( |
| { |
| "result": None, |
| "meta": { |
| "ok": False, |
| "error": "Missing required argument: query", |
| }, |
| } |
| ) |
|
|
|
|
| @mcp.tool(app=True) |
| async def hub_search_prefab(query: str) -> ToolResult: |
| """Run the Prefab UI service with deterministic rendering over raw Hub output.""" |
| try: |
| wire = await _build_query_wire(query) |
| except Exception as exc: |
| traceback.print_exc() |
| wire = error_wire(str(exc)) |
| return _wire_tool_result(wire) |
|
|
|
|
| @mcp.tool |
| async def hub_search_prefab_wire(query: str | None = None) -> str: |
| """Return final deterministic Prefab wire JSON for a Hub query.""" |
| if not query: |
| return json.dumps(error_wire("Missing required argument: query"), ensure_ascii=False) |
| try: |
| wire = await _build_query_wire(query) |
| return json.dumps(wire, ensure_ascii=False) |
| except Exception as exc: |
| traceback.print_exc() |
| return json.dumps(error_wire(str(exc)), ensure_ascii=False) |
|
|
|
|
| @mcp.tool |
| async def hub_search_raw_debug(query: str | None = None) -> str: |
| """Return the raw live-service payload from the raw Hub search agent.""" |
| if not query: |
| return _missing_query_json() |
| try: |
| return await _run_raw(query) |
| except Exception as exc: |
| traceback.print_exc() |
| return json.dumps({"result": None, "meta": {"ok": False, "error": str(exc)}}) |
|
|
|
|
|
|
| def main() -> None: |
| mcp.run( |
| "streamable-http", |
| host=HOST, |
| port=PORT, |
| path=PATH, |
| stateless_http=True, |
| middleware=_middleware, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|