""" Validation Test Script for Nova API v2 Jogging This script validates that v2 jogging message formats work correctly with a real Nova instance BEFORE migrating the nova_jogger.py module. This is Phase 0 and must pass before implementation. Prerequisites: - .env.local configured with Nova credentials - Robot in safe position - Robot controller accessible Usage: python scripts/validate_v2_jogging.py """ import json import os import sys import time from pathlib import Path from typing import Any, Dict, Optional # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) try: from websockets.sync.client import connect except ImportError: print("ERROR: websockets package required") print("Install with: pip install websockets") sys.exit(1) # Load environment variables def load_env_file(filepath: Path): """Load environment variables from .env file.""" if not filepath.exists(): return with open(filepath, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() env_local_path = Path(__file__).parent.parent / '.env.local' load_env_file(env_local_path) # Get configuration from environment INSTANCE_URL = os.getenv('NOVA_INSTANCE_URL') or os.getenv('NOVA_API') ACCESS_TOKEN = os.getenv('NOVA_ACCESS_TOKEN') CELL_ID = os.getenv('NOVA_CELL_ID', 'cell') CONTROLLER_ID = os.getenv('NOVA_CONTROLLER_ID') MOTION_GROUP_ID = os.getenv('NOVA_MOTION_GROUP_ID') TCP_NAME = os.getenv('NOVA_TCP_NAME', 'Flange') # Validation results test_results = [] def check_config(): """Verify required configuration is present.""" print("=" * 70) print("Nova API v2 Jogging Validation") print("=" * 70) print() missing = [] if not INSTANCE_URL: missing.append("NOVA_INSTANCE_URL (or NOVA_API)") if not ACCESS_TOKEN: missing.append("NOVA_ACCESS_TOKEN") if not CONTROLLER_ID: missing.append("NOVA_CONTROLLER_ID") if not MOTION_GROUP_ID: missing.append("NOVA_MOTION_GROUP_ID") if missing: print("ERROR: Missing required environment variables:") for var in missing: print(f" - {var}") print() print("Please configure .env.local with Nova credentials") sys.exit(1) print("Configuration:") print(f" Instance URL: {INSTANCE_URL}") print(f" Cell ID: {CELL_ID}") print(f" Controller ID: {CONTROLLER_ID}") print(f" Motion Group ID: {MOTION_GROUP_ID}") print(f" TCP Name: {TCP_NAME}") print() def create_websocket_url() -> str: """Create the v2 jogging WebSocket URL.""" ws_url = INSTANCE_URL.replace("https://", "wss://").replace("http://", "ws://") url = f"{ws_url}/api/v2/cells/{CELL_ID}/controllers/{CONTROLLER_ID}/execution/jogging" # Add token as query parameter url += f"?token={ACCESS_TOKEN}" return url def log_test(name: str, passed: bool, message: str = "", details: Any = None): """Log test result.""" status = "✓ PASS" if passed else "✗ FAIL" print(f"[{status}] {name}") if message: print(f" {message}") if details: print(f" Details: {json.dumps(details, indent=8)}") print() test_results.append({ "test": name, "passed": passed, "message": message, "details": details }) def test_1_connect_and_initialize(): """Test 1: Connect to WebSocket and send InitializeJoggingRequest.""" print("-" * 70) print("Test 1: Connect and Initialize") print("-" * 70) try: url = create_websocket_url() print(f"Connecting to: {url[:80]}...") ws = connect(url, open_timeout=10) log_test("WebSocket Connection", True, "Successfully connected") # Send InitializeJoggingRequest init_msg = { "message_type": "InitializeJoggingRequest", "motion_group": MOTION_GROUP_ID, "tcp": TCP_NAME, "response_coordinate_system": "world" } print(f"Sending: {json.dumps(init_msg, indent=2)}") ws.send(json.dumps(init_msg)) # Wait for response print("Waiting for response...") response = ws.recv(timeout=5.0) response_data = json.loads(response) print(f"Received: {json.dumps(response_data, indent=2)}") # Check response (v2 API wraps result in {"result": {"kind": ...}}) result = response_data.get("result", response_data) if "kind" in result: if result["kind"] == "INITIALIZE_RECEIVED": log_test("InitializeJoggingRequest", True, "Received INITIALIZE_RECEIVED", result) ws.close() return True, ws else: log_test("InitializeJoggingRequest", False, f"Unexpected kind: {result['kind']}", result) ws.close() return False, None else: log_test("InitializeJoggingRequest", False, "No 'kind' field in response", response_data) ws.close() return False, None except Exception as e: log_test("Test 1", False, f"Exception: {type(e).__name__}: {e}") return False, None def test_2_joint_velocity(): """Test 2: Send JointVelocityRequest.""" print("-" * 70) print("Test 2: Joint Velocity Command") print("-" * 70) try: url = create_websocket_url() ws = connect(url, open_timeout=10) # Initialize first init_msg = { "message_type": "InitializeJoggingRequest", "motion_group": MOTION_GROUP_ID, "tcp": TCP_NAME } ws.send(json.dumps(init_msg)) response = ws.recv(timeout=5.0) init_data = json.loads(response) result = init_data.get('result', init_data) print(f"Init response: {result.get('kind')}") # Capture initial position initial_pose = init_data.get('current_pose', {}) initial_joints = initial_pose.get('joint_position', {}).get('joints', []) if initial_joints: print(f"Initial joint 0 position: {initial_joints[0]:.4f} rad") # Send joint velocity (small safe velocity on joint 0) jog_msg = { "message_type": "JointVelocityRequest", "velocity": [0.05, 0, 0, 0, 0, 0] # Very small velocity for safety } print(f"Sending: {json.dumps(jog_msg, indent=2)}") # Send a few times and track movement moved = False for i in range(5): ws.send(json.dumps(jog_msg)) time.sleep(0.1) # Check for state updates try: response = ws.recv(timeout=0.05) state_data = json.loads(response) if state_data.get('kind') == 'STATE_UPDATE': current_joints = state_data.get('current_pose', {}).get('joint_position', {}).get('joints', []) if current_joints and initial_joints: delta = abs(current_joints[0] - initial_joints[0]) if delta > 0.001: # Movement threshold moved = True print(f" Robot moved! Joint 0 delta: {delta:.4f} rad") except: pass # No response yet, continue # Stop with zeros stop_msg = { "message_type": "JointVelocityRequest", "velocity": [0, 0, 0, 0, 0, 0] } ws.send(json.dumps(stop_msg)) # Wait a bit and check final position time.sleep(0.2) try: while True: response = ws.recv(timeout=0.1) state_data = json.loads(response) if state_data.get('kind') == 'STATE_UPDATE': final_joints = state_data.get('current_pose', {}).get('joint_position', {}).get('joints', []) if final_joints and initial_joints: delta = abs(final_joints[0] - initial_joints[0]) if delta > 0.001: moved = True print(f"Final joint 0 position: {final_joints[0]:.4f} rad (moved {delta:.4f} rad)") except: pass status_msg = "Joint velocity commands sent successfully" if moved: status_msg += " - ROBOT MOVED CONFIRMED" else: status_msg += " - WARNING: No movement detected" log_test("JointVelocityRequest", True, status_msg) ws.close() return True except Exception as e: log_test("Test 2", False, f"Exception: {type(e).__name__}: {e}") return False def test_3_tcp_velocity(): """Test 3: Send TcpVelocityRequest.""" print("-" * 70) print("Test 3: TCP Velocity Command") print("-" * 70) try: url = create_websocket_url() ws = connect(url, open_timeout=10) # Initialize init_msg = { "message_type": "InitializeJoggingRequest", "motion_group": MOTION_GROUP_ID, "tcp": TCP_NAME } ws.send(json.dumps(init_msg)) response = ws.recv(timeout=5.0) init_data = json.loads(response) result = init_data.get('result', init_data) print(f"Init response: {result.get('kind')}") # Send TCP velocity (small safe velocity in X) tcp_msg = { "message_type": "TcpVelocityRequest", "translation": [5.0, 0.0, 0.0], # 5 mm/s in X "rotation": [0.0, 0.0, 0.0], "use_tool_coordinate_system": False } print(f"Sending: {json.dumps(tcp_msg, indent=2)}") # Send a few times for i in range(5): ws.send(json.dumps(tcp_msg)) time.sleep(0.1) # Stop with zeros stop_msg = { "message_type": "TcpVelocityRequest", "translation": [0.0, 0.0, 0.0], "rotation": [0.0, 0.0, 0.0], "use_tool_coordinate_system": False } ws.send(json.dumps(stop_msg)) log_test("TcpVelocityRequest", True, "TCP velocity commands sent successfully") ws.close() return True except Exception as e: log_test("Test 3", False, f"Exception: {type(e).__name__}: {e}") return False def test_4_pause(): """Test 4: Send PauseJoggingRequest.""" print("-" * 70) print("Test 4: Pause Command") print("-" * 70) try: url = create_websocket_url() ws = connect(url, open_timeout=10) # Initialize init_msg = { "message_type": "InitializeJoggingRequest", "motion_group": MOTION_GROUP_ID, "tcp": TCP_NAME } ws.send(json.dumps(init_msg)) response = ws.recv(timeout=5.0) init_data = json.loads(response) result = init_data.get('result', init_data) print(f"Init response: {result.get('kind')}") # Start movement jog_msg = { "message_type": "JointVelocityRequest", "velocity": [0.05, 0, 0, 0, 0, 0] } ws.send(json.dumps(jog_msg)) time.sleep(0.2) # Send pause pause_msg = {"message_type": "PauseJoggingRequest"} print(f"Sending: {json.dumps(pause_msg, indent=2)}") ws.send(json.dumps(pause_msg)) log_test("PauseJoggingRequest", True, "Pause command sent successfully") ws.close() return True except Exception as e: log_test("Test 4", False, f"Exception: {type(e).__name__}: {e}") return False def test_5_continuous_sending(): """Test 5: Continuous command sending at 10Hz.""" print("-" * 70) print("Test 5: Continuous Sending (10Hz)") print("-" * 70) try: url = create_websocket_url() ws = connect(url, open_timeout=10) # Initialize init_msg = { "message_type": "InitializeJoggingRequest", "motion_group": MOTION_GROUP_ID, "tcp": TCP_NAME } ws.send(json.dumps(init_msg)) response = ws.recv(timeout=5.0) init_data = json.loads(response) result = init_data.get('result', init_data) print(f"Init response: {result.get('kind')}") # Send 50 commands at 10Hz (5 seconds) jog_msg = { "message_type": "JointVelocityRequest", "velocity": [0.03, 0, 0, 0, 0, 0] } print("Sending 50 commands at 10Hz (5 seconds)...") start_time = time.time() for i in range(50): ws.send(json.dumps(jog_msg)) time.sleep(0.1) if (i + 1) % 10 == 0: print(f" Sent {i + 1}/50 commands") duration = time.time() - start_time print(f"Completed in {duration:.2f} seconds") # Stop stop_msg = { "message_type": "JointVelocityRequest", "velocity": [0, 0, 0, 0, 0, 0] } ws.send(json.dumps(stop_msg)) log_test("Continuous Sending", True, f"50 commands sent at 10Hz in {duration:.2f}s") ws.close() return True except Exception as e: log_test("Test 5", False, f"Exception: {type(e).__name__}: {e}") return False def get_robot_state(): """Get current robot state from state stream.""" try: import urllib.request url = f"{INSTANCE_URL}/api/v2/cells/{CELL_ID}/controllers/{CONTROLLER_ID}/motion-groups/{MOTION_GROUP_ID}/state" headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"} req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=5) as response: data = json.loads(response.read()) return data except Exception as e: print(f"Failed to get robot state: {e}") return None def test_6_multi_axis_joint(): """Test 6: Move two joints simultaneously and verify movement.""" print("-" * 70) print("Test 6: Multi-Axis Joint Movement with Verification") print("-" * 70) try: # Get initial state initial_state = get_robot_state() if not initial_state: log_test("Multi-Axis Joint Movement", False, "Could not get initial robot state") return False initial_joints = initial_state.get('joint_position', []) if not initial_joints or len(initial_joints) < 2: log_test("Multi-Axis Joint Movement", False, "Invalid initial joint data") return False print(f"Initial joints [0,1]: [{initial_joints[0]:.4f}, {initial_joints[1]:.4f}] rad") # Connect to jogging endpoint url = create_websocket_url() ws = connect(url, open_timeout=10) # Initialize init_msg = { "message_type": "InitializeJoggingRequest", "motion_group": MOTION_GROUP_ID, "tcp": TCP_NAME } ws.send(json.dumps(init_msg)) response = ws.recv(timeout=5.0) init_data = json.loads(response) result = init_data.get('result', init_data) print(f"Init response: {result.get('kind')}") # Send multi-axis joint velocity (move joints 0 and 1 simultaneously) jog_msg = { "message_type": "JointVelocityRequest", "velocity": [0.05, 0.05, 0, 0, 0, 0] # Move joints 0 and 1 } print(f"Sending: {json.dumps(jog_msg, indent=2)}") # Send commands for 1 second print("Jogging for 1 second...") for i in range(10): ws.send(json.dumps(jog_msg)) time.sleep(0.1) # Stop stop_msg = { "message_type": "JointVelocityRequest", "velocity": [0, 0, 0, 0, 0, 0] } ws.send(json.dumps(stop_msg)) ws.close() # Wait for motion to complete time.sleep(0.5) # Get final state final_state = get_robot_state() if not final_state: log_test("Multi-Axis Joint Movement", False, "Could not get final robot state") return False final_joints = final_state.get('joint_position', []) if not final_joints or len(final_joints) < 2: log_test("Multi-Axis Joint Movement", False, "Invalid final joint data") return False print(f"Final joints [0,1]: [{final_joints[0]:.4f}, {final_joints[1]:.4f}] rad") # Calculate deltas delta_j0 = abs(final_joints[0] - initial_joints[0]) delta_j1 = abs(final_joints[1] - initial_joints[1]) print(f"Delta joint 0: {delta_j0:.4f} rad") print(f"Delta joint 1: {delta_j1:.4f} rad") # Verify both joints moved movement_threshold = 0.001 # rad j0_moved = delta_j0 > movement_threshold j1_moved = delta_j1 > movement_threshold if j0_moved and j1_moved: log_test("Multi-Axis Joint Movement", True, f"Both joints moved as expected (Δj0={delta_j0:.4f}, Δj1={delta_j1:.4f})", {"delta_joint_0": delta_j0, "delta_joint_1": delta_j1}) elif j0_moved or j1_moved: log_test("Multi-Axis Joint Movement", False, f"Only one joint moved (Δj0={delta_j0:.4f}, Δj1={delta_j1:.4f})", {"delta_joint_0": delta_j0, "delta_joint_1": delta_j1}) else: log_test("Multi-Axis Joint Movement", False, "No movement detected in either joint", {"delta_joint_0": delta_j0, "delta_joint_1": delta_j1}) return j0_moved and j1_moved except Exception as e: log_test("Test 6", False, f"Exception: {type(e).__name__}: {e}") import traceback traceback.print_exc() return False def test_7_mode_switching(): """Test 6: Dynamic mode switching between joint and TCP.""" print("-" * 70) print("Test 6: Mode Switching (Joint ↔ TCP)") print("-" * 70) try: url = create_websocket_url() ws = connect(url, open_timeout=10) # Initialize init_msg = { "message_type": "InitializeJoggingRequest", "motion_group": MOTION_GROUP_ID, "tcp": TCP_NAME } ws.send(json.dumps(init_msg)) response = ws.recv(timeout=5.0) init_data = json.loads(response) result = init_data.get('result', init_data) print(f"Init response: {result.get('kind')}") # Start with joint jogging print("Phase 1: Joint jogging...") joint_msg = { "message_type": "JointVelocityRequest", "velocity": [0.05, 0, 0, 0, 0, 0] } for i in range(10): ws.send(json.dumps(joint_msg)) time.sleep(0.1) # Switch to TCP jogging print("Phase 2: Switching to TCP jogging...") tcp_msg = { "message_type": "TcpVelocityRequest", "translation": [5.0, 0.0, 0.0], "rotation": [0.0, 0.0, 0.0], "use_tool_coordinate_system": False } for i in range(10): ws.send(json.dumps(tcp_msg)) time.sleep(0.1) # Switch back to joint print("Phase 3: Switching back to joint jogging...") joint_msg2 = { "message_type": "JointVelocityRequest", "velocity": [0, 0, 0, 0, 0, 0.05] # Different joint } for i in range(10): ws.send(json.dumps(joint_msg2)) time.sleep(0.1) # Stop stop_msg = { "message_type": "JointVelocityRequest", "velocity": [0, 0, 0, 0, 0, 0] } ws.send(json.dumps(stop_msg)) log_test("Mode Switching", True, "Successfully switched between joint and TCP modes") ws.close() return True except Exception as e: log_test("Test 6", False, f"Exception: {type(e).__name__}: {e}") return False def print_summary(): """Print test summary.""" print() print("=" * 70) print("VALIDATION SUMMARY") print("=" * 70) print() passed = sum(1 for r in test_results if r["passed"]) total = len(test_results) print(f"Tests passed: {passed}/{total}") print() if passed == total: print("✓ ALL TESTS PASSED") print() print("The v2 API message formats have been validated.") print("You can now proceed with Phase 1 (implementation).") else: print("✗ SOME TESTS FAILED") print() print("Please review the failures above before proceeding.") print("Do NOT proceed to Phase 1 until all tests pass!") print() print("=" * 70) # Write results to file results_file = Path(__file__).parent / "v2_jogging_validation_results.json" with open(results_file, 'w') as f: json.dump({ "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "passed": passed, "total": total, "all_passed": passed == total, "results": test_results }, f, indent=2) print(f"Results saved to: {results_file}") def main(): """Run all validation tests.""" import sys check_config() # Check for --auto flag to skip confirmation auto_mode = "--auto" in sys.argv if not auto_mode: print("WARNING: This will move the robot!") print("Ensure the robot is in a safe position before proceeding.") print() response = input("Continue? [y/N]: ") if response.lower() != 'y': print("Aborted.") return else: print("Running in AUTO mode (skipping safety confirmation)") print() # Run tests test_1_connect_and_initialize() test_2_joint_velocity() test_3_tcp_velocity() test_4_pause() test_5_continuous_sending() test_6_multi_axis_joint() test_7_mode_switching() # Print summary print_summary() if __name__ == "__main__": main()