| | """ |
| | Validation Test Script for Nova API v2 Jogging |
| | |
| | This script validates that v2 jogging message formats work correctly with a real Nova instance |
| | BEFORE migrating the nova_jogger.py module. This is Phase 0 and must pass before implementation. |
| | |
| | Prerequisites: |
| | - .env.local configured with Nova credentials |
| | - Robot in safe position |
| | - Robot controller accessible |
| | |
| | Usage: |
| | python scripts/validate_v2_jogging.py |
| | """ |
| |
|
| | import json |
| | import os |
| | import sys |
| | import time |
| | from pathlib import Path |
| | from typing import Any, Dict, Optional |
| |
|
| | |
| | sys.path.insert(0, str(Path(__file__).parent.parent)) |
| |
|
| | try: |
| | from websockets.sync.client import connect |
| | except ImportError: |
| | print("ERROR: websockets package required") |
| | print("Install with: pip install websockets") |
| | sys.exit(1) |
| |
|
| | |
| | def load_env_file(filepath: Path): |
| | """Load environment variables from .env file.""" |
| | if not filepath.exists(): |
| | return |
| | with open(filepath, 'r') as f: |
| | for line in f: |
| | line = line.strip() |
| | if line and not line.startswith('#') and '=' in line: |
| | key, value = line.split('=', 1) |
| | os.environ[key.strip()] = value.strip() |
| |
|
| | env_local_path = Path(__file__).parent.parent / '.env.local' |
| | load_env_file(env_local_path) |
| |
|
| | |
| | INSTANCE_URL = os.getenv('NOVA_INSTANCE_URL') or os.getenv('NOVA_API') |
| | ACCESS_TOKEN = os.getenv('NOVA_ACCESS_TOKEN') |
| | CELL_ID = os.getenv('NOVA_CELL_ID', 'cell') |
| | CONTROLLER_ID = os.getenv('NOVA_CONTROLLER_ID') |
| | MOTION_GROUP_ID = os.getenv('NOVA_MOTION_GROUP_ID') |
| | TCP_NAME = os.getenv('NOVA_TCP_NAME', 'Flange') |
| |
|
| | |
| | test_results = [] |
| |
|
| | def check_config(): |
| | """Verify required configuration is present.""" |
| | print("=" * 70) |
| | print("Nova API v2 Jogging Validation") |
| | print("=" * 70) |
| | print() |
| |
|
| | missing = [] |
| | if not INSTANCE_URL: |
| | missing.append("NOVA_INSTANCE_URL (or NOVA_API)") |
| | if not ACCESS_TOKEN: |
| | missing.append("NOVA_ACCESS_TOKEN") |
| | if not CONTROLLER_ID: |
| | missing.append("NOVA_CONTROLLER_ID") |
| | if not MOTION_GROUP_ID: |
| | missing.append("NOVA_MOTION_GROUP_ID") |
| |
|
| | if missing: |
| | print("ERROR: Missing required environment variables:") |
| | for var in missing: |
| | print(f" - {var}") |
| | print() |
| | print("Please configure .env.local with Nova credentials") |
| | sys.exit(1) |
| |
|
| | print("Configuration:") |
| | print(f" Instance URL: {INSTANCE_URL}") |
| | print(f" Cell ID: {CELL_ID}") |
| | print(f" Controller ID: {CONTROLLER_ID}") |
| | print(f" Motion Group ID: {MOTION_GROUP_ID}") |
| | print(f" TCP Name: {TCP_NAME}") |
| | print() |
| |
|
| | def create_websocket_url() -> str: |
| | """Create the v2 jogging WebSocket URL.""" |
| | ws_url = INSTANCE_URL.replace("https://", "wss://").replace("http://", "ws://") |
| | url = f"{ws_url}/api/v2/cells/{CELL_ID}/controllers/{CONTROLLER_ID}/execution/jogging" |
| | |
| | url += f"?token={ACCESS_TOKEN}" |
| | return url |
| |
|
| | def log_test(name: str, passed: bool, message: str = "", details: Any = None): |
| | """Log test result.""" |
| | status = "✓ PASS" if passed else "✗ FAIL" |
| | print(f"[{status}] {name}") |
| | if message: |
| | print(f" {message}") |
| | if details: |
| | print(f" Details: {json.dumps(details, indent=8)}") |
| | print() |
| |
|
| | test_results.append({ |
| | "test": name, |
| | "passed": passed, |
| | "message": message, |
| | "details": details |
| | }) |
| |
|
| | def test_1_connect_and_initialize(): |
| | """Test 1: Connect to WebSocket and send InitializeJoggingRequest.""" |
| | print("-" * 70) |
| | print("Test 1: Connect and Initialize") |
| | print("-" * 70) |
| |
|
| | try: |
| | url = create_websocket_url() |
| | print(f"Connecting to: {url[:80]}...") |
| |
|
| | ws = connect(url, open_timeout=10) |
| | log_test("WebSocket Connection", True, "Successfully connected") |
| |
|
| | |
| | init_msg = { |
| | "message_type": "InitializeJoggingRequest", |
| | "motion_group": MOTION_GROUP_ID, |
| | "tcp": TCP_NAME, |
| | "response_coordinate_system": "world" |
| | } |
| | print(f"Sending: {json.dumps(init_msg, indent=2)}") |
| | ws.send(json.dumps(init_msg)) |
| |
|
| | |
| | print("Waiting for response...") |
| | response = ws.recv(timeout=5.0) |
| | response_data = json.loads(response) |
| | print(f"Received: {json.dumps(response_data, indent=2)}") |
| |
|
| | |
| | result = response_data.get("result", response_data) |
| | if "kind" in result: |
| | if result["kind"] == "INITIALIZE_RECEIVED": |
| | log_test("InitializeJoggingRequest", True, "Received INITIALIZE_RECEIVED", result) |
| | ws.close() |
| | return True, ws |
| | else: |
| | log_test("InitializeJoggingRequest", False, f"Unexpected kind: {result['kind']}", result) |
| | ws.close() |
| | return False, None |
| | else: |
| | log_test("InitializeJoggingRequest", False, "No 'kind' field in response", response_data) |
| | ws.close() |
| | return False, None |
| |
|
| | except Exception as e: |
| | log_test("Test 1", False, f"Exception: {type(e).__name__}: {e}") |
| | return False, None |
| |
|
| | def test_2_joint_velocity(): |
| | """Test 2: Send JointVelocityRequest.""" |
| | print("-" * 70) |
| | print("Test 2: Joint Velocity Command") |
| | print("-" * 70) |
| |
|
| | try: |
| | url = create_websocket_url() |
| | ws = connect(url, open_timeout=10) |
| |
|
| | |
| | init_msg = { |
| | "message_type": "InitializeJoggingRequest", |
| | "motion_group": MOTION_GROUP_ID, |
| | "tcp": TCP_NAME |
| | } |
| | ws.send(json.dumps(init_msg)) |
| | response = ws.recv(timeout=5.0) |
| | init_data = json.loads(response) |
| | result = init_data.get('result', init_data) |
| | print(f"Init response: {result.get('kind')}") |
| |
|
| | |
| | initial_pose = init_data.get('current_pose', {}) |
| | initial_joints = initial_pose.get('joint_position', {}).get('joints', []) |
| | if initial_joints: |
| | print(f"Initial joint 0 position: {initial_joints[0]:.4f} rad") |
| |
|
| | |
| | jog_msg = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0.05, 0, 0, 0, 0, 0] |
| | } |
| | print(f"Sending: {json.dumps(jog_msg, indent=2)}") |
| |
|
| | |
| | moved = False |
| | for i in range(5): |
| | ws.send(json.dumps(jog_msg)) |
| | time.sleep(0.1) |
| | |
| | try: |
| | response = ws.recv(timeout=0.05) |
| | state_data = json.loads(response) |
| | if state_data.get('kind') == 'STATE_UPDATE': |
| | current_joints = state_data.get('current_pose', {}).get('joint_position', {}).get('joints', []) |
| | if current_joints and initial_joints: |
| | delta = abs(current_joints[0] - initial_joints[0]) |
| | if delta > 0.001: |
| | moved = True |
| | print(f" Robot moved! Joint 0 delta: {delta:.4f} rad") |
| | except: |
| | pass |
| |
|
| | |
| | stop_msg = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0, 0, 0, 0, 0, 0] |
| | } |
| | ws.send(json.dumps(stop_msg)) |
| |
|
| | |
| | time.sleep(0.2) |
| | try: |
| | while True: |
| | response = ws.recv(timeout=0.1) |
| | state_data = json.loads(response) |
| | if state_data.get('kind') == 'STATE_UPDATE': |
| | final_joints = state_data.get('current_pose', {}).get('joint_position', {}).get('joints', []) |
| | if final_joints and initial_joints: |
| | delta = abs(final_joints[0] - initial_joints[0]) |
| | if delta > 0.001: |
| | moved = True |
| | print(f"Final joint 0 position: {final_joints[0]:.4f} rad (moved {delta:.4f} rad)") |
| | except: |
| | pass |
| |
|
| | status_msg = "Joint velocity commands sent successfully" |
| | if moved: |
| | status_msg += " - ROBOT MOVED CONFIRMED" |
| | else: |
| | status_msg += " - WARNING: No movement detected" |
| |
|
| | log_test("JointVelocityRequest", True, status_msg) |
| | ws.close() |
| | return True |
| |
|
| | except Exception as e: |
| | log_test("Test 2", False, f"Exception: {type(e).__name__}: {e}") |
| | return False |
| |
|
| | def test_3_tcp_velocity(): |
| | """Test 3: Send TcpVelocityRequest.""" |
| | print("-" * 70) |
| | print("Test 3: TCP Velocity Command") |
| | print("-" * 70) |
| |
|
| | try: |
| | url = create_websocket_url() |
| | ws = connect(url, open_timeout=10) |
| |
|
| | |
| | init_msg = { |
| | "message_type": "InitializeJoggingRequest", |
| | "motion_group": MOTION_GROUP_ID, |
| | "tcp": TCP_NAME |
| | } |
| | ws.send(json.dumps(init_msg)) |
| | response = ws.recv(timeout=5.0) |
| | init_data = json.loads(response) |
| | result = init_data.get('result', init_data) |
| | print(f"Init response: {result.get('kind')}") |
| |
|
| | |
| | tcp_msg = { |
| | "message_type": "TcpVelocityRequest", |
| | "translation": [5.0, 0.0, 0.0], |
| | "rotation": [0.0, 0.0, 0.0], |
| | "use_tool_coordinate_system": False |
| | } |
| | print(f"Sending: {json.dumps(tcp_msg, indent=2)}") |
| |
|
| | |
| | for i in range(5): |
| | ws.send(json.dumps(tcp_msg)) |
| | time.sleep(0.1) |
| |
|
| | |
| | stop_msg = { |
| | "message_type": "TcpVelocityRequest", |
| | "translation": [0.0, 0.0, 0.0], |
| | "rotation": [0.0, 0.0, 0.0], |
| | "use_tool_coordinate_system": False |
| | } |
| | ws.send(json.dumps(stop_msg)) |
| |
|
| | log_test("TcpVelocityRequest", True, "TCP velocity commands sent successfully") |
| | ws.close() |
| | return True |
| |
|
| | except Exception as e: |
| | log_test("Test 3", False, f"Exception: {type(e).__name__}: {e}") |
| | return False |
| |
|
| | def test_4_pause(): |
| | """Test 4: Send PauseJoggingRequest.""" |
| | print("-" * 70) |
| | print("Test 4: Pause Command") |
| | print("-" * 70) |
| |
|
| | try: |
| | url = create_websocket_url() |
| | ws = connect(url, open_timeout=10) |
| |
|
| | |
| | init_msg = { |
| | "message_type": "InitializeJoggingRequest", |
| | "motion_group": MOTION_GROUP_ID, |
| | "tcp": TCP_NAME |
| | } |
| | ws.send(json.dumps(init_msg)) |
| | response = ws.recv(timeout=5.0) |
| | init_data = json.loads(response) |
| | result = init_data.get('result', init_data) |
| | print(f"Init response: {result.get('kind')}") |
| |
|
| | |
| | jog_msg = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0.05, 0, 0, 0, 0, 0] |
| | } |
| | ws.send(json.dumps(jog_msg)) |
| | time.sleep(0.2) |
| |
|
| | |
| | pause_msg = {"message_type": "PauseJoggingRequest"} |
| | print(f"Sending: {json.dumps(pause_msg, indent=2)}") |
| | ws.send(json.dumps(pause_msg)) |
| |
|
| | log_test("PauseJoggingRequest", True, "Pause command sent successfully") |
| | ws.close() |
| | return True |
| |
|
| | except Exception as e: |
| | log_test("Test 4", False, f"Exception: {type(e).__name__}: {e}") |
| | return False |
| |
|
| | def test_5_continuous_sending(): |
| | """Test 5: Continuous command sending at 10Hz.""" |
| | print("-" * 70) |
| | print("Test 5: Continuous Sending (10Hz)") |
| | print("-" * 70) |
| |
|
| | try: |
| | url = create_websocket_url() |
| | ws = connect(url, open_timeout=10) |
| |
|
| | |
| | init_msg = { |
| | "message_type": "InitializeJoggingRequest", |
| | "motion_group": MOTION_GROUP_ID, |
| | "tcp": TCP_NAME |
| | } |
| | ws.send(json.dumps(init_msg)) |
| | response = ws.recv(timeout=5.0) |
| | init_data = json.loads(response) |
| | result = init_data.get('result', init_data) |
| | print(f"Init response: {result.get('kind')}") |
| |
|
| | |
| | jog_msg = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0.03, 0, 0, 0, 0, 0] |
| | } |
| |
|
| | print("Sending 50 commands at 10Hz (5 seconds)...") |
| | start_time = time.time() |
| | for i in range(50): |
| | ws.send(json.dumps(jog_msg)) |
| | time.sleep(0.1) |
| | if (i + 1) % 10 == 0: |
| | print(f" Sent {i + 1}/50 commands") |
| | duration = time.time() - start_time |
| | print(f"Completed in {duration:.2f} seconds") |
| |
|
| | |
| | stop_msg = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0, 0, 0, 0, 0, 0] |
| | } |
| | ws.send(json.dumps(stop_msg)) |
| |
|
| | log_test("Continuous Sending", True, f"50 commands sent at 10Hz in {duration:.2f}s") |
| | ws.close() |
| | return True |
| |
|
| | except Exception as e: |
| | log_test("Test 5", False, f"Exception: {type(e).__name__}: {e}") |
| | return False |
| |
|
| | def get_robot_state(): |
| | """Get current robot state from state stream.""" |
| | try: |
| | import urllib.request |
| | url = f"{INSTANCE_URL}/api/v2/cells/{CELL_ID}/controllers/{CONTROLLER_ID}/motion-groups/{MOTION_GROUP_ID}/state" |
| | headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"} |
| | req = urllib.request.Request(url, headers=headers) |
| | with urllib.request.urlopen(req, timeout=5) as response: |
| | data = json.loads(response.read()) |
| | return data |
| | except Exception as e: |
| | print(f"Failed to get robot state: {e}") |
| | return None |
| |
|
| | def test_6_multi_axis_joint(): |
| | """Test 6: Move two joints simultaneously and verify movement.""" |
| | print("-" * 70) |
| | print("Test 6: Multi-Axis Joint Movement with Verification") |
| | print("-" * 70) |
| |
|
| | try: |
| | |
| | initial_state = get_robot_state() |
| | if not initial_state: |
| | log_test("Multi-Axis Joint Movement", False, "Could not get initial robot state") |
| | return False |
| |
|
| | initial_joints = initial_state.get('joint_position', []) |
| | if not initial_joints or len(initial_joints) < 2: |
| | log_test("Multi-Axis Joint Movement", False, "Invalid initial joint data") |
| | return False |
| |
|
| | print(f"Initial joints [0,1]: [{initial_joints[0]:.4f}, {initial_joints[1]:.4f}] rad") |
| |
|
| | |
| | url = create_websocket_url() |
| | ws = connect(url, open_timeout=10) |
| |
|
| | |
| | init_msg = { |
| | "message_type": "InitializeJoggingRequest", |
| | "motion_group": MOTION_GROUP_ID, |
| | "tcp": TCP_NAME |
| | } |
| | ws.send(json.dumps(init_msg)) |
| | response = ws.recv(timeout=5.0) |
| | init_data = json.loads(response) |
| | result = init_data.get('result', init_data) |
| | print(f"Init response: {result.get('kind')}") |
| |
|
| | |
| | jog_msg = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0.05, 0.05, 0, 0, 0, 0] |
| | } |
| | print(f"Sending: {json.dumps(jog_msg, indent=2)}") |
| |
|
| | |
| | print("Jogging for 1 second...") |
| | for i in range(10): |
| | ws.send(json.dumps(jog_msg)) |
| | time.sleep(0.1) |
| |
|
| | |
| | stop_msg = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0, 0, 0, 0, 0, 0] |
| | } |
| | ws.send(json.dumps(stop_msg)) |
| | ws.close() |
| |
|
| | |
| | time.sleep(0.5) |
| |
|
| | |
| | final_state = get_robot_state() |
| | if not final_state: |
| | log_test("Multi-Axis Joint Movement", False, "Could not get final robot state") |
| | return False |
| |
|
| | final_joints = final_state.get('joint_position', []) |
| | if not final_joints or len(final_joints) < 2: |
| | log_test("Multi-Axis Joint Movement", False, "Invalid final joint data") |
| | return False |
| |
|
| | print(f"Final joints [0,1]: [{final_joints[0]:.4f}, {final_joints[1]:.4f}] rad") |
| |
|
| | |
| | delta_j0 = abs(final_joints[0] - initial_joints[0]) |
| | delta_j1 = abs(final_joints[1] - initial_joints[1]) |
| |
|
| | print(f"Delta joint 0: {delta_j0:.4f} rad") |
| | print(f"Delta joint 1: {delta_j1:.4f} rad") |
| |
|
| | |
| | movement_threshold = 0.001 |
| | j0_moved = delta_j0 > movement_threshold |
| | j1_moved = delta_j1 > movement_threshold |
| |
|
| | if j0_moved and j1_moved: |
| | log_test("Multi-Axis Joint Movement", True, |
| | f"Both joints moved as expected (Δj0={delta_j0:.4f}, Δj1={delta_j1:.4f})", |
| | {"delta_joint_0": delta_j0, "delta_joint_1": delta_j1}) |
| | elif j0_moved or j1_moved: |
| | log_test("Multi-Axis Joint Movement", False, |
| | f"Only one joint moved (Δj0={delta_j0:.4f}, Δj1={delta_j1:.4f})", |
| | {"delta_joint_0": delta_j0, "delta_joint_1": delta_j1}) |
| | else: |
| | log_test("Multi-Axis Joint Movement", False, |
| | "No movement detected in either joint", |
| | {"delta_joint_0": delta_j0, "delta_joint_1": delta_j1}) |
| |
|
| | return j0_moved and j1_moved |
| |
|
| | except Exception as e: |
| | log_test("Test 6", False, f"Exception: {type(e).__name__}: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | return False |
| |
|
| | def test_7_mode_switching(): |
| | """Test 6: Dynamic mode switching between joint and TCP.""" |
| | print("-" * 70) |
| | print("Test 6: Mode Switching (Joint ↔ TCP)") |
| | print("-" * 70) |
| |
|
| | try: |
| | url = create_websocket_url() |
| | ws = connect(url, open_timeout=10) |
| |
|
| | |
| | init_msg = { |
| | "message_type": "InitializeJoggingRequest", |
| | "motion_group": MOTION_GROUP_ID, |
| | "tcp": TCP_NAME |
| | } |
| | ws.send(json.dumps(init_msg)) |
| | response = ws.recv(timeout=5.0) |
| | init_data = json.loads(response) |
| | result = init_data.get('result', init_data) |
| | print(f"Init response: {result.get('kind')}") |
| |
|
| | |
| | print("Phase 1: Joint jogging...") |
| | joint_msg = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0.05, 0, 0, 0, 0, 0] |
| | } |
| | for i in range(10): |
| | ws.send(json.dumps(joint_msg)) |
| | time.sleep(0.1) |
| |
|
| | |
| | print("Phase 2: Switching to TCP jogging...") |
| | tcp_msg = { |
| | "message_type": "TcpVelocityRequest", |
| | "translation": [5.0, 0.0, 0.0], |
| | "rotation": [0.0, 0.0, 0.0], |
| | "use_tool_coordinate_system": False |
| | } |
| | for i in range(10): |
| | ws.send(json.dumps(tcp_msg)) |
| | time.sleep(0.1) |
| |
|
| | |
| | print("Phase 3: Switching back to joint jogging...") |
| | joint_msg2 = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0, 0, 0, 0, 0, 0.05] |
| | } |
| | for i in range(10): |
| | ws.send(json.dumps(joint_msg2)) |
| | time.sleep(0.1) |
| |
|
| | |
| | stop_msg = { |
| | "message_type": "JointVelocityRequest", |
| | "velocity": [0, 0, 0, 0, 0, 0] |
| | } |
| | ws.send(json.dumps(stop_msg)) |
| |
|
| | log_test("Mode Switching", True, "Successfully switched between joint and TCP modes") |
| | ws.close() |
| | return True |
| |
|
| | except Exception as e: |
| | log_test("Test 6", False, f"Exception: {type(e).__name__}: {e}") |
| | return False |
| |
|
| | def print_summary(): |
| | """Print test summary.""" |
| | print() |
| | print("=" * 70) |
| | print("VALIDATION SUMMARY") |
| | print("=" * 70) |
| | print() |
| |
|
| | passed = sum(1 for r in test_results if r["passed"]) |
| | total = len(test_results) |
| |
|
| | print(f"Tests passed: {passed}/{total}") |
| | print() |
| |
|
| | if passed == total: |
| | print("✓ ALL TESTS PASSED") |
| | print() |
| | print("The v2 API message formats have been validated.") |
| | print("You can now proceed with Phase 1 (implementation).") |
| | else: |
| | print("✗ SOME TESTS FAILED") |
| | print() |
| | print("Please review the failures above before proceeding.") |
| | print("Do NOT proceed to Phase 1 until all tests pass!") |
| |
|
| | print() |
| | print("=" * 70) |
| |
|
| | |
| | results_file = Path(__file__).parent / "v2_jogging_validation_results.json" |
| | with open(results_file, 'w') as f: |
| | json.dump({ |
| | "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
| | "passed": passed, |
| | "total": total, |
| | "all_passed": passed == total, |
| | "results": test_results |
| | }, f, indent=2) |
| | print(f"Results saved to: {results_file}") |
| |
|
| | def main(): |
| | """Run all validation tests.""" |
| | import sys |
| | check_config() |
| |
|
| | |
| | auto_mode = "--auto" in sys.argv |
| |
|
| | if not auto_mode: |
| | print("WARNING: This will move the robot!") |
| | print("Ensure the robot is in a safe position before proceeding.") |
| | print() |
| | response = input("Continue? [y/N]: ") |
| | if response.lower() != 'y': |
| | print("Aborted.") |
| | return |
| | else: |
| | print("Running in AUTO mode (skipping safety confirmation)") |
| |
|
| | print() |
| |
|
| | |
| | test_1_connect_and_initialize() |
| | test_2_joint_velocity() |
| | test_3_tcp_velocity() |
| | test_4_pause() |
| | test_5_continuous_sending() |
| | test_6_multi_axis_joint() |
| | test_7_mode_switching() |
| |
|
| | |
| | print_summary() |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|