| """
|
| Durable Objects integration for OpenManus
|
| Provides interface to Cloudflare Durable Objects operations
|
| """
|
|
|
| import json
|
| import time
|
| from typing import Any, Dict, List, Optional
|
|
|
| from app.logger import logger
|
|
|
| from .client import CloudflareClient, CloudflareError
|
|
|
|
|
| class DurableObjects:
|
| """Cloudflare Durable Objects client"""
|
|
|
| def __init__(self, client: CloudflareClient):
|
| self.client = client
|
|
|
| async def create_agent_session(
|
| self, session_id: str, user_id: str, metadata: Optional[Dict[str, Any]] = None
|
| ) -> Dict[str, Any]:
|
| """Create a new agent session"""
|
|
|
| session_data = {
|
| "sessionId": session_id,
|
| "userId": user_id,
|
| "metadata": metadata or {},
|
| }
|
|
|
| try:
|
| response = await self.client.post(
|
| f"do/agent/{session_id}/start", data=session_data, use_worker=True
|
| )
|
|
|
| return {
|
| "success": True,
|
| "session_id": session_id,
|
| "user_id": user_id,
|
| **response,
|
| }
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to create agent session: {e}")
|
| raise
|
|
|
| async def get_agent_session_status(self, session_id: str) -> Dict[str, Any]:
|
| """Get agent session status"""
|
|
|
| try:
|
| response = await self.client.get(
|
| f"do/agent/{session_id}/status?sessionId={session_id}", use_worker=True
|
| )
|
|
|
| return response
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to get agent session status: {e}")
|
| raise
|
|
|
| async def update_agent_session(
|
| self, session_id: str, updates: Dict[str, Any]
|
| ) -> Dict[str, Any]:
|
| """Update agent session"""
|
|
|
| update_data = {"sessionId": session_id, "updates": updates}
|
|
|
| try:
|
| response = await self.client.post(
|
| f"do/agent/{session_id}/update", data=update_data, use_worker=True
|
| )
|
|
|
| return {"success": True, "session_id": session_id, **response}
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to update agent session: {e}")
|
| raise
|
|
|
| async def stop_agent_session(self, session_id: str) -> Dict[str, Any]:
|
| """Stop agent session"""
|
|
|
| try:
|
| response = await self.client.post(
|
| f"do/agent/{session_id}/stop",
|
| data={"sessionId": session_id},
|
| use_worker=True,
|
| )
|
|
|
| return {"success": True, "session_id": session_id, **response}
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to stop agent session: {e}")
|
| raise
|
|
|
| async def add_agent_message(
|
| self, session_id: str, message: Dict[str, Any]
|
| ) -> Dict[str, Any]:
|
| """Add a message to agent session"""
|
|
|
| message_data = {
|
| "sessionId": session_id,
|
| "message": {"timestamp": int(time.time()), **message},
|
| }
|
|
|
| try:
|
| response = await self.client.post(
|
| f"do/agent/{session_id}/messages", data=message_data, use_worker=True
|
| )
|
|
|
| return {"success": True, "session_id": session_id, **response}
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to add agent message: {e}")
|
| raise
|
|
|
| async def get_agent_messages(
|
| self, session_id: str, limit: int = 50, offset: int = 0
|
| ) -> Dict[str, Any]:
|
| """Get agent session messages"""
|
|
|
| try:
|
| response = await self.client.get(
|
| f"do/agent/{session_id}/messages?sessionId={session_id}&limit={limit}&offset={offset}",
|
| use_worker=True,
|
| )
|
|
|
| return response
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to get agent messages: {e}")
|
| raise
|
|
|
|
|
| async def join_chat_room(
|
| self,
|
| room_id: str,
|
| user_id: str,
|
| username: str,
|
| room_config: Optional[Dict[str, Any]] = None,
|
| ) -> Dict[str, Any]:
|
| """Join a chat room"""
|
|
|
| join_data = {
|
| "userId": user_id,
|
| "username": username,
|
| "roomConfig": room_config or {},
|
| }
|
|
|
| try:
|
| response = await self.client.post(
|
| f"do/chat/{room_id}/join", data=join_data, use_worker=True
|
| )
|
|
|
| return {"success": True, "room_id": room_id, "user_id": user_id, **response}
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to join chat room: {e}")
|
| raise
|
|
|
| async def leave_chat_room(self, room_id: str, user_id: str) -> Dict[str, Any]:
|
| """Leave a chat room"""
|
|
|
| leave_data = {"userId": user_id}
|
|
|
| try:
|
| response = await self.client.post(
|
| f"do/chat/{room_id}/leave", data=leave_data, use_worker=True
|
| )
|
|
|
| return {"success": True, "room_id": room_id, "user_id": user_id, **response}
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to leave chat room: {e}")
|
| raise
|
|
|
| async def get_chat_room_info(self, room_id: str) -> Dict[str, Any]:
|
| """Get chat room information"""
|
|
|
| try:
|
| response = await self.client.get(f"do/chat/{room_id}/info", use_worker=True)
|
|
|
| return response
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to get chat room info: {e}")
|
| raise
|
|
|
| async def send_chat_message(
|
| self,
|
| room_id: str,
|
| user_id: str,
|
| username: str,
|
| content: str,
|
| message_type: str = "text",
|
| ) -> Dict[str, Any]:
|
| """Send a message to chat room"""
|
|
|
| message_data = {
|
| "userId": user_id,
|
| "username": username,
|
| "content": content,
|
| "messageType": message_type,
|
| }
|
|
|
| try:
|
| response = await self.client.post(
|
| f"do/chat/{room_id}/messages", data=message_data, use_worker=True
|
| )
|
|
|
| return {"success": True, "room_id": room_id, **response}
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to send chat message: {e}")
|
| raise
|
|
|
| async def get_chat_messages(
|
| self, room_id: str, limit: int = 50, offset: int = 0
|
| ) -> Dict[str, Any]:
|
| """Get chat room messages"""
|
|
|
| try:
|
| response = await self.client.get(
|
| f"do/chat/{room_id}/messages?limit={limit}&offset={offset}",
|
| use_worker=True,
|
| )
|
|
|
| return response
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to get chat messages: {e}")
|
| raise
|
|
|
| async def get_chat_participants(self, room_id: str) -> Dict[str, Any]:
|
| """Get chat room participants"""
|
|
|
| try:
|
| response = await self.client.get(
|
| f"do/chat/{room_id}/participants", use_worker=True
|
| )
|
|
|
| return response
|
|
|
| except CloudflareError as e:
|
| logger.error(f"Failed to get chat participants: {e}")
|
| raise
|
|
|
|
|
| def get_agent_websocket_url(self, session_id: str, user_id: str) -> str:
|
| """Get WebSocket URL for agent session"""
|
|
|
| if not self.client.worker_url:
|
| raise CloudflareError("Worker URL not configured")
|
|
|
| base_url = self.client.worker_url.replace("https://", "wss://").replace(
|
| "http://", "ws://"
|
| )
|
| return (
|
| f"{base_url}/do/agent/{session_id}?sessionId={session_id}&userId={user_id}"
|
| )
|
|
|
| def get_chat_websocket_url(self, room_id: str, user_id: str, username: str) -> str:
|
| """Get WebSocket URL for chat room"""
|
|
|
| if not self.client.worker_url:
|
| raise CloudflareError("Worker URL not configured")
|
|
|
| base_url = self.client.worker_url.replace("https://", "wss://").replace(
|
| "http://", "ws://"
|
| )
|
| return f"{base_url}/do/chat/{room_id}?userId={user_id}&username={username}"
|
|
|
|
|
| class DurableObjectsWebSocket:
|
| """Helper class for WebSocket connections to Durable Objects"""
|
|
|
| def __init__(self, url: str):
|
| self.url = url
|
| self.websocket = None
|
| self.connected = False
|
| self.message_handlers = {}
|
|
|
| async def connect(self):
|
| """Connect to WebSocket"""
|
| try:
|
| import websockets
|
|
|
| self.websocket = await websockets.connect(self.url)
|
| self.connected = True
|
| logger.info(f"Connected to Durable Object WebSocket: {self.url}")
|
|
|
|
|
| import asyncio
|
|
|
| asyncio.create_task(self._message_loop())
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to connect to WebSocket: {e}")
|
| raise CloudflareError(f"WebSocket connection failed: {e}")
|
|
|
| async def disconnect(self):
|
| """Disconnect from WebSocket"""
|
| if self.websocket and self.connected:
|
| await self.websocket.close()
|
| self.connected = False
|
| logger.info("Disconnected from Durable Object WebSocket")
|
|
|
| async def send_message(self, message_type: str, payload: Dict[str, Any]):
|
| """Send message via WebSocket"""
|
| if not self.connected or not self.websocket:
|
| raise CloudflareError("WebSocket not connected")
|
|
|
| message = {
|
| "type": message_type,
|
| "payload": payload,
|
| "timestamp": int(time.time()),
|
| }
|
|
|
| try:
|
| await self.websocket.send(json.dumps(message))
|
| except Exception as e:
|
| logger.error(f"Failed to send WebSocket message: {e}")
|
| raise CloudflareError(f"Failed to send message: {e}")
|
|
|
| def add_message_handler(self, message_type: str, handler):
|
| """Add a message handler for specific message types"""
|
| if message_type not in self.message_handlers:
|
| self.message_handlers[message_type] = []
|
| self.message_handlers[message_type].append(handler)
|
|
|
| async def _message_loop(self):
|
| """Handle incoming WebSocket messages"""
|
| try:
|
| async for message in self.websocket:
|
| try:
|
| data = json.loads(message)
|
| message_type = data.get("type")
|
|
|
| if message_type in self.message_handlers:
|
| for handler in self.message_handlers[message_type]:
|
| try:
|
| if callable(handler):
|
| if asyncio.iscoroutinefunction(handler):
|
| await handler(data)
|
| else:
|
| handler(data)
|
| except Exception as e:
|
| logger.error(f"Message handler error: {e}")
|
|
|
| except json.JSONDecodeError as e:
|
| logger.error(f"Failed to parse WebSocket message: {e}")
|
| except Exception as e:
|
| logger.error(f"WebSocket message processing error: {e}")
|
|
|
| except Exception as e:
|
| logger.error(f"WebSocket message loop error: {e}")
|
| self.connected = False
|
|
|
|
|
| async def __aenter__(self):
|
| await self.connect()
|
| return self
|
|
|
| async def __aexit__(self, exc_type, exc_val, exc_tb):
|
| await self.disconnect()
|
|
|