| | """ |
| | Protocol type definitions for Nova-Sim WebSocket API. |
| | |
| | This module defines the structure of all WebSocket messages exchanged between |
| | clients and the server, including both request and response messages. |
| | """ |
| |
|
| | from typing import TypedDict, Literal, Optional, Union, List |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | RobotType = Literal["g1", "spot", "ur5", "ur5_t_push"] |
| | SceneType = Optional[str] |
| | ControlMode = Literal["ik", "joint"] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ActionData(TypedDict, total=False): |
| | """Velocity-based action commands for robot control. |
| | |
| | For locomotion robots (G1, Spot): |
| | - vx: Forward/backward velocity [-1, 1] |
| | - vy: Left/right strafe velocity [-1, 1] |
| | - vyaw: Turn rate [-1, 1] |
| | |
| | For robot arms (UR5): |
| | - vx, vy, vz: Cartesian translation velocities (m/s) |
| | - vrx, vry, vrz: Cartesian rotation velocities (rad/s) |
| | - j1-j6: Joint velocities (rad/s) |
| | - gripper: Gripper position [0-255] |
| | """ |
| | |
| | vx: float |
| | vy: float |
| | vz: float |
| |
|
| | |
| | vyaw: float |
| |
|
| | |
| | vrx: float |
| | vry: float |
| | vrz: float |
| |
|
| | |
| | j1: float |
| | j2: float |
| | j3: float |
| | j4: float |
| | j5: float |
| | j6: float |
| |
|
| | |
| | gripper: float |
| |
|
| |
|
| | class ActionMessage(TypedDict): |
| | """Action command message for all robots.""" |
| | type: Literal["action"] |
| | data: ActionData |
| |
|
| |
|
| | class TeleopActionData(TypedDict, total=False): |
| | """Teleoperation action data (backward compatible with old teleop_command).""" |
| | vx: float |
| | vy: float |
| | vz: float |
| | |
| | dx: float |
| | dy: float |
| | dz: float |
| |
|
| |
|
| | class TeleopActionMessage(TypedDict): |
| | """Teleoperation action message (UR5 keyboard control).""" |
| | type: Literal["teleop_action"] |
| | data: TeleopActionData |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ResetData(TypedDict, total=False): |
| | """Reset environment data.""" |
| | seed: Optional[int] |
| |
|
| |
|
| | class ResetMessage(TypedDict): |
| | """Reset environment message.""" |
| | type: Literal["reset"] |
| | data: ResetData |
| |
|
| |
|
| | class SwitchRobotData(TypedDict): |
| | """Switch robot data.""" |
| | robot: RobotType |
| | scene: Optional[str] |
| |
|
| |
|
| | class SwitchRobotMessage(TypedDict): |
| | """Switch robot message.""" |
| | type: Literal["switch_robot"] |
| | data: SwitchRobotData |
| |
|
| |
|
| | class CameraRotateData(TypedDict): |
| | """Camera rotation data.""" |
| | action: Literal["rotate"] |
| | dx: float |
| | dy: float |
| |
|
| |
|
| | class CameraZoomData(TypedDict): |
| | """Camera zoom data.""" |
| | action: Literal["zoom"] |
| | dz: float |
| |
|
| |
|
| | class CameraPanData(TypedDict): |
| | """Camera pan data.""" |
| | action: Literal["pan"] |
| | dx: float |
| | dy: float |
| |
|
| |
|
| | class CameraSetDistanceData(TypedDict): |
| | """Camera set distance data.""" |
| | action: Literal["set_distance"] |
| | distance: float |
| |
|
| |
|
| | CameraData = Union[CameraRotateData, CameraZoomData, CameraPanData, CameraSetDistanceData] |
| |
|
| |
|
| | class CameraMessage(TypedDict): |
| | """Camera control message.""" |
| | type: Literal["camera"] |
| | data: CameraData |
| |
|
| |
|
| | class CameraFollowData(TypedDict): |
| | """Camera follow mode data.""" |
| | follow: bool |
| |
|
| |
|
| | class CameraFollowMessage(TypedDict): |
| | """Camera follow mode message.""" |
| | type: Literal["camera_follow"] |
| | data: CameraFollowData |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ArmTargetData(TypedDict): |
| | """Arm target position data (IK mode).""" |
| | x: float |
| | y: float |
| | z: float |
| |
|
| |
|
| | class ArmTargetMessage(TypedDict): |
| | """Set arm target position message (UR5, IK mode).""" |
| | type: Literal["arm_target"] |
| | data: ArmTargetData |
| |
|
| |
|
| | class ArmOrientationData(TypedDict): |
| | """Arm target orientation data (IK mode).""" |
| | roll: float |
| | pitch: float |
| | yaw: float |
| |
|
| |
|
| | class ArmOrientationMessage(TypedDict): |
| | """Set arm target orientation message (UR5, IK mode).""" |
| | type: Literal["arm_orientation"] |
| | data: ArmOrientationData |
| |
|
| |
|
| | class UseOrientationData(TypedDict): |
| | """Toggle orientation control data.""" |
| | enabled: bool |
| |
|
| |
|
| | class UseOrientationMessage(TypedDict): |
| | """Toggle orientation control message (UR5).""" |
| | type: Literal["use_orientation"] |
| | data: UseOrientationData |
| |
|
| |
|
| | class JointPositionsData(TypedDict): |
| | """Joint positions data (joint mode).""" |
| | positions: List[float] |
| |
|
| |
|
| | class JointPositionsMessage(TypedDict): |
| | """Set joint positions message (UR5, joint mode).""" |
| | type: Literal["joint_positions"] |
| | data: JointPositionsData |
| |
|
| |
|
| | class ControlModeData(TypedDict): |
| | """Control mode data.""" |
| | mode: ControlMode |
| |
|
| |
|
| | class ControlModeMessage(TypedDict): |
| | """Set control mode message (UR5).""" |
| | type: Literal["control_mode"] |
| | data: ControlModeData |
| |
|
| |
|
| | class GripperData(TypedDict): |
| | """Gripper control data.""" |
| | action: Literal["open", "close"] |
| | value: Optional[int] |
| |
|
| |
|
| | class GripperMessage(TypedDict): |
| | """Gripper control message (UR5).""" |
| | type: Literal["gripper"] |
| | data: GripperData |
| |
|
| |
|
| | class NovaModeSetting(TypedDict): |
| | """Nova API mode settings.""" |
| | state_streaming: bool |
| | ik: bool |
| |
|
| |
|
| | class SetNovaModeData(TypedDict): |
| | """Set Nova API mode data.""" |
| | enabled: Optional[bool] |
| | |
| | state_streaming: Optional[bool] |
| | ik: Optional[bool] |
| |
|
| |
|
| | class SetNovaModeMessage(TypedDict): |
| | """Set Nova API mode message (UR5).""" |
| | type: Literal["set_nova_mode"] |
| | data: SetNovaModeData |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ClientIdentityData(TypedDict): |
| | """Client identity data.""" |
| | client_id: str |
| |
|
| |
|
| | class ClientIdentityMessage(TypedDict): |
| | """Client identity handshake message.""" |
| | type: Literal["client_identity"] |
| | data: ClientIdentityData |
| |
|
| |
|
| | class ClientNotificationData(TypedDict, total=False): |
| | """Client notification data.""" |
| | message: str |
| | level: Literal["info", "warning", "error"] |
| |
|
| |
|
| | class ClientNotificationMessage(TypedDict): |
| | """Client notification message.""" |
| | type: Literal["client_notification"] |
| | data: ClientNotificationData |
| |
|
| |
|
| | class EpisodeControlData(TypedDict): |
| | """Episode control data.""" |
| | action: Literal["terminate", "truncate"] |
| |
|
| |
|
| | class EpisodeControlMessage(TypedDict): |
| | """Episode control message (trainer only).""" |
| | type: Literal["episode_control"] |
| | data: EpisodeControlData |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class Position(TypedDict): |
| | """3D position.""" |
| | x: float |
| | y: float |
| | z: float |
| |
|
| |
|
| | class Quaternion(TypedDict): |
| | """Quaternion orientation.""" |
| | w: float |
| | x: float |
| | y: float |
| | z: float |
| |
|
| |
|
| | class EulerAngles(TypedDict): |
| | """Euler angles orientation.""" |
| | roll: float |
| | pitch: float |
| | yaw: float |
| |
|
| |
|
| | class SceneObject(TypedDict): |
| | """Scene object information (position and orientation).""" |
| | name: str |
| | position: Position |
| | orientation: Quaternion |
| |
|
| |
|
| | class LocomotionObservation(TypedDict): |
| | """Observation data for locomotion robots (G1, Spot).""" |
| | position: Position |
| | orientation: Quaternion |
| |
|
| |
|
| | class UR5Observation(TypedDict): |
| | """Observation data for UR5 robot arm.""" |
| | end_effector: Position |
| | ee_orientation: Quaternion |
| | ee_target: Position |
| | ee_target_orientation: EulerAngles |
| | gripper: int |
| | joint_positions: List[float] |
| | joint_targets: List[float] |
| |
|
| |
|
| | Observation = Union[LocomotionObservation, UR5Observation] |
| |
|
| |
|
| | class NovaApiStatus(TypedDict): |
| | """Nova API integration status.""" |
| | connected: bool |
| | state_streaming: bool |
| | ik: bool |
| |
|
| |
|
| | class StateData(TypedDict, total=False): |
| | """State broadcast data.""" |
| | observation: Observation |
| | steps: int |
| | reward: float |
| | teleop_action: ActionData |
| | connected_clients: List[str] |
| | scene_objects: List[SceneObject] |
| |
|
| | |
| | control_mode: ControlMode |
| | nova_api: NovaApiStatus |
| |
|
| |
|
| | class StateMessage(TypedDict): |
| | """State broadcast message.""" |
| | type: Literal["state"] |
| | data: StateData |
| |
|
| |
|
| | class ConnectedClientsData(TypedDict): |
| | """Connected clients status data.""" |
| | clients: List[str] |
| |
|
| |
|
| | class ConnectedClientsMessage(TypedDict): |
| | """Connected clients broadcast message (to all clients).""" |
| | type: Literal["connected_clients"] |
| | data: ConnectedClientsData |
| |
|
| |
|
| | class ClientNotificationBroadcast(TypedDict): |
| | """Client notification broadcast (to all clients).""" |
| | type: Literal["client_notification"] |
| | data: ClientNotificationData |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class CameraIntrinsics(TypedDict): |
| | """Camera intrinsics for pinhole camera model.""" |
| | fx: float |
| | fy: float |
| | cx: float |
| | cy: float |
| | width: int |
| | height: int |
| | fovy_degrees: float |
| |
|
| |
|
| | class CameraPose(TypedDict): |
| | """Camera pose in world coordinates.""" |
| | position: Position |
| | orientation: Quaternion |
| |
|
| |
|
| | class CameraFeed(TypedDict, total=False): |
| | """Camera feed information.""" |
| | name: str |
| | label: str |
| | url: str |
| | pose: CameraPose |
| | intrinsics: CameraIntrinsics |
| |
|
| |
|
| | class EnvResponse(TypedDict, total=False): |
| | """GET /env response.""" |
| | robot: RobotType |
| | scene: str |
| | has_gripper: bool |
| | control_mode: ControlMode |
| | action_space: dict |
| | observation_space: dict |
| | camera_feeds: List[CameraFeed] |
| | home_pose: List[float] |
| |
|
| |
|
| | class CommandInfo(TypedDict): |
| | """Command metadata.""" |
| | name: str |
| | description: str |
| |
|
| |
|
| | class MetadataResponse(TypedDict): |
| | """GET /metadata response.""" |
| | robots: List[str] |
| | commands: List[CommandInfo] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | ClientMessage = Union[ |
| | ActionMessage, |
| | TeleopActionMessage, |
| | ResetMessage, |
| | SwitchRobotMessage, |
| | CameraMessage, |
| | CameraFollowMessage, |
| | ArmTargetMessage, |
| | ArmOrientationMessage, |
| | UseOrientationMessage, |
| | JointPositionsMessage, |
| | ControlModeMessage, |
| | GripperMessage, |
| | SetNovaModeMessage, |
| | ClientIdentityMessage, |
| | ClientNotificationMessage, |
| | EpisodeControlMessage, |
| | ] |
| |
|
| | ServerMessage = Union[ |
| | StateMessage, |
| | ConnectedClientsMessage, |
| | ClientNotificationBroadcast, |
| | ] |
| |
|