""" Protocol type definitions for Nova-Sim WebSocket API. This module defines the structure of all WebSocket messages exchanged between clients and the server, including both request and response messages. """ from typing import TypedDict, Literal, Optional, Union, List # ============================================================================ # Common Types # ============================================================================ RobotType = Literal["g1", "spot", "ur5", "ur5_t_push"] SceneType = Optional[str] ControlMode = Literal["ik", "joint"] # ============================================================================ # Action Messages (Client -> Server) # ============================================================================ class ActionData(TypedDict, total=False): """Velocity-based action commands for robot control. For locomotion robots (G1, Spot): - vx: Forward/backward velocity [-1, 1] - vy: Left/right strafe velocity [-1, 1] - vyaw: Turn rate [-1, 1] For robot arms (UR5): - vx, vy, vz: Cartesian translation velocities (m/s) - vrx, vry, vrz: Cartesian rotation velocities (rad/s) - j1-j6: Joint velocities (rad/s) - gripper: Gripper position [0-255] """ # Cartesian translation velocities (m/s for UR5, normalized for locomotion) vx: float vy: float vz: float # Rotation velocity (rad/s for UR5, normalized for locomotion) vyaw: float # Cartesian rotation velocities (rad/s, UR5 only) vrx: float vry: float vrz: float # Joint velocities (rad/s, UR5 only) j1: float j2: float j3: float j4: float j5: float j6: float # Gripper position (0=open, 255=closed, UR5 only) gripper: float class ActionMessage(TypedDict): """Action command message for all robots.""" type: Literal["action"] data: ActionData class TeleopActionData(TypedDict, total=False): """Teleoperation action data (backward compatible with old teleop_command).""" vx: float vy: float vz: float # Backward compatibility: accept old dx/dy/dz format dx: float dy: float dz: float class TeleopActionMessage(TypedDict): """Teleoperation action message (UR5 keyboard control).""" type: Literal["teleop_action"] data: TeleopActionData # ============================================================================ # Other Client -> Server Messages # ============================================================================ class ResetData(TypedDict, total=False): """Reset environment data.""" seed: Optional[int] class ResetMessage(TypedDict): """Reset environment message.""" type: Literal["reset"] data: ResetData class SwitchRobotData(TypedDict): """Switch robot data.""" robot: RobotType scene: Optional[str] class SwitchRobotMessage(TypedDict): """Switch robot message.""" type: Literal["switch_robot"] data: SwitchRobotData class CameraRotateData(TypedDict): """Camera rotation data.""" action: Literal["rotate"] dx: float dy: float class CameraZoomData(TypedDict): """Camera zoom data.""" action: Literal["zoom"] dz: float class CameraPanData(TypedDict): """Camera pan data.""" action: Literal["pan"] dx: float dy: float class CameraSetDistanceData(TypedDict): """Camera set distance data.""" action: Literal["set_distance"] distance: float CameraData = Union[CameraRotateData, CameraZoomData, CameraPanData, CameraSetDistanceData] class CameraMessage(TypedDict): """Camera control message.""" type: Literal["camera"] data: CameraData class CameraFollowData(TypedDict): """Camera follow mode data.""" follow: bool class CameraFollowMessage(TypedDict): """Camera follow mode message.""" type: Literal["camera_follow"] data: CameraFollowData # ============================================================================ # UR5-Specific Client -> Server Messages # ============================================================================ class ArmTargetData(TypedDict): """Arm target position data (IK mode).""" x: float y: float z: float class ArmTargetMessage(TypedDict): """Set arm target position message (UR5, IK mode).""" type: Literal["arm_target"] data: ArmTargetData class ArmOrientationData(TypedDict): """Arm target orientation data (IK mode).""" roll: float pitch: float yaw: float class ArmOrientationMessage(TypedDict): """Set arm target orientation message (UR5, IK mode).""" type: Literal["arm_orientation"] data: ArmOrientationData class UseOrientationData(TypedDict): """Toggle orientation control data.""" enabled: bool class UseOrientationMessage(TypedDict): """Toggle orientation control message (UR5).""" type: Literal["use_orientation"] data: UseOrientationData class JointPositionsData(TypedDict): """Joint positions data (joint mode).""" positions: List[float] # Array of 6 joint angles in radians class JointPositionsMessage(TypedDict): """Set joint positions message (UR5, joint mode).""" type: Literal["joint_positions"] data: JointPositionsData class ControlModeData(TypedDict): """Control mode data.""" mode: ControlMode class ControlModeMessage(TypedDict): """Set control mode message (UR5).""" type: Literal["control_mode"] data: ControlModeData class GripperData(TypedDict): """Gripper control data.""" action: Literal["open", "close"] value: Optional[int] # 0-255, only used if action is not "open" or "close" class GripperMessage(TypedDict): """Gripper control message (UR5).""" type: Literal["gripper"] data: GripperData class NovaModeSetting(TypedDict): """Nova API mode settings.""" state_streaming: bool ik: bool class SetNovaModeData(TypedDict): """Set Nova API mode data.""" enabled: Optional[bool] # Legacy: enable/disable all Nova features # New granular settings: state_streaming: Optional[bool] ik: Optional[bool] class SetNovaModeMessage(TypedDict): """Set Nova API mode message (UR5).""" type: Literal["set_nova_mode"] data: SetNovaModeData # ============================================================================ # Client Identity & Notification Messages (Client -> Server) # ============================================================================ class ClientIdentityData(TypedDict): """Client identity data.""" client_id: str class ClientIdentityMessage(TypedDict): """Client identity handshake message.""" type: Literal["client_identity"] data: ClientIdentityData class ClientNotificationData(TypedDict, total=False): """Client notification data.""" message: str level: Literal["info", "warning", "error"] class ClientNotificationMessage(TypedDict): """Client notification message.""" type: Literal["client_notification"] data: ClientNotificationData class EpisodeControlData(TypedDict): """Episode control data.""" action: Literal["terminate", "truncate"] class EpisodeControlMessage(TypedDict): """Episode control message (trainer only).""" type: Literal["episode_control"] data: EpisodeControlData # ============================================================================ # Server -> Client Messages # ============================================================================ class Position(TypedDict): """3D position.""" x: float y: float z: float class Quaternion(TypedDict): """Quaternion orientation.""" w: float x: float y: float z: float class EulerAngles(TypedDict): """Euler angles orientation.""" roll: float pitch: float yaw: float class SceneObject(TypedDict): """Scene object information (position and orientation).""" name: str position: Position orientation: Quaternion class LocomotionObservation(TypedDict): """Observation data for locomotion robots (G1, Spot).""" position: Position orientation: Quaternion class UR5Observation(TypedDict): """Observation data for UR5 robot arm.""" end_effector: Position ee_orientation: Quaternion ee_target: Position ee_target_orientation: EulerAngles gripper: int # 0-255 joint_positions: List[float] # 6 joint angles joint_targets: List[float] # 6 target joint angles Observation = Union[LocomotionObservation, UR5Observation] class NovaApiStatus(TypedDict): """Nova API integration status.""" connected: bool state_streaming: bool ik: bool class StateData(TypedDict, total=False): """State broadcast data.""" observation: Observation steps: int reward: float teleop_action: ActionData # Current action/velocity commands connected_clients: List[str] # List of connected client IDs scene_objects: List[SceneObject] # Scene objects (position and orientation) # UR5-specific fields control_mode: ControlMode nova_api: NovaApiStatus class StateMessage(TypedDict): """State broadcast message.""" type: Literal["state"] data: StateData class ConnectedClientsData(TypedDict): """Connected clients status data.""" clients: List[str] # List of connected client IDs class ConnectedClientsMessage(TypedDict): """Connected clients broadcast message (to all clients).""" type: Literal["connected_clients"] data: ConnectedClientsData class ClientNotificationBroadcast(TypedDict): """Client notification broadcast (to all clients).""" type: Literal["client_notification"] data: ClientNotificationData # ============================================================================ # HTTP Response Types # ============================================================================ class CameraIntrinsics(TypedDict): """Camera intrinsics for pinhole camera model.""" fx: float fy: float cx: float cy: float width: int height: int fovy_degrees: float class CameraPose(TypedDict): """Camera pose in world coordinates.""" position: Position orientation: Quaternion class CameraFeed(TypedDict, total=False): """Camera feed information.""" name: str label: str url: str pose: CameraPose intrinsics: CameraIntrinsics class EnvResponse(TypedDict, total=False): """GET /env response.""" robot: RobotType scene: str has_gripper: bool control_mode: ControlMode action_space: dict # Gym space serialization observation_space: dict # Gym space serialization camera_feeds: List[CameraFeed] home_pose: List[float] # Joint angles for home position class CommandInfo(TypedDict): """Command metadata.""" name: str description: str class MetadataResponse(TypedDict): """GET /metadata response.""" robots: List[str] commands: List[CommandInfo] # ============================================================================ # Union Types for All Messages # ============================================================================ ClientMessage = Union[ ActionMessage, TeleopActionMessage, ResetMessage, SwitchRobotMessage, CameraMessage, CameraFollowMessage, ArmTargetMessage, ArmOrientationMessage, UseOrientationMessage, JointPositionsMessage, ControlModeMessage, GripperMessage, SetNovaModeMessage, ClientIdentityMessage, ClientNotificationMessage, EpisodeControlMessage, ] ServerMessage = Union[ StateMessage, ConnectedClientsMessage, ClientNotificationBroadcast, ]