| """ |
| FastAPI REST API for Computer-Using Agent |
| Provides HTTP endpoints for agent control and interaction |
| """ |
|
|
| from fastapi import FastAPI, HTTPException, WebSocket |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from typing import Optional, Dict, Any |
| import asyncio |
| from loguru import logger |
|
|
| from .cua_agent import ComputerUsingAgent |
|
|
| |
| app = FastAPI( |
| title="Computer-Using Agent API", |
| description="REST API for controlling the computer-using agent", |
| version="1.0.0" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| agent = ComputerUsingAgent() |
|
|
| |
| class TaskRequest(BaseModel): |
| task: str |
| |
| class TaskResponse(BaseModel): |
| success: bool |
| message: str |
| screenshot: Optional[str] = None |
| task: str |
|
|
| class StatusResponse(BaseModel): |
| status: str |
| current_task: Optional[str] |
| display: str |
| active_window: Dict[str, Any] |
|
|
| class ScreenshotResponse(BaseModel): |
| screenshot: str |
| timestamp: str |
|
|
| |
|
|
| @app.get("/") |
| async def root(): |
| """API root endpoint""" |
| return { |
| "name": "Computer-Using Agent API", |
| "version": "1.0.0", |
| "status": "running", |
| "endpoints": { |
| "status": "/agent/status", |
| "execute": "/agent/execute", |
| "screenshot": "/agent/screenshot", |
| "stop": "/agent/stop", |
| "docs": "/docs" |
| } |
| } |
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check endpoint""" |
| return {"status": "healthy"} |
|
|
| @app.get("/agent/status", response_model=StatusResponse) |
| async def get_status(): |
| """ |
| Get current agent status |
| |
| Returns agent status, current task, and active window information |
| """ |
| try: |
| status = agent.get_status() |
| return StatusResponse(**status) |
| except Exception as e: |
| logger.error(f"Error getting status: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/agent/execute", response_model=TaskResponse) |
| async def execute_task(request: TaskRequest): |
| """ |
| Execute a task using the computer-using agent |
| |
| Args: |
| request: Task request with natural language description |
| |
| Returns: |
| Task execution result with screenshot |
| """ |
| try: |
| logger.info(f"Received task: {request.task}") |
| result = agent.execute_task(request.task) |
| return TaskResponse(**result) |
| except Exception as e: |
| logger.error(f"Error executing task: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/agent/screenshot", response_model=ScreenshotResponse) |
| async def capture_screenshot(): |
| """ |
| Capture a screenshot of the desktop |
| |
| Returns: |
| Screenshot as base64-encoded PNG |
| """ |
| try: |
| screenshot_b64 = agent.get_screenshot_base64() |
| |
| if screenshot_b64: |
| import datetime |
| return ScreenshotResponse( |
| screenshot=screenshot_b64, |
| timestamp=datetime.datetime.now().isoformat() |
| ) |
| else: |
| raise HTTPException(status_code=500, detail="Failed to capture screenshot") |
| |
| except Exception as e: |
| logger.error(f"Error capturing screenshot: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/agent/stop") |
| async def stop_agent(): |
| """ |
| Stop the current agent task |
| |
| Returns: |
| Success message |
| """ |
| try: |
| agent.stop() |
| return {"message": "Agent stopped", "status": "stopped"} |
| except Exception as e: |
| logger.error(f"Error stopping agent: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.websocket("/ws/agent") |
| async def websocket_endpoint(websocket: WebSocket): |
| """ |
| WebSocket endpoint for real-time agent updates |
| |
| Streams agent status and task updates |
| """ |
| await websocket.accept() |
| logger.info("WebSocket client connected") |
| |
| try: |
| while True: |
| |
| status = agent.get_status() |
| await websocket.send_json(status) |
| await asyncio.sleep(2) |
| |
| except Exception as e: |
| logger.error(f"WebSocket error: {e}") |
| finally: |
| logger.info("WebSocket client disconnected") |
|
|
| |
| @app.on_event("startup") |
| async def startup_event(): |
| """Initialize services on startup""" |
| logger.info("Agent API starting up") |
| |
| import os |
| os.makedirs("/app/logs", exist_ok=True) |
|
|
| @app.on_event("shutdown") |
| async def shutdown_event(): |
| """Cleanup on shutdown""" |
| logger.info("Agent API shutting down") |
| agent.stop() |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|