nova-sim / scripts /validate_v2_jogging.py
Georg
Enhance multi-axis homing and Nova API v2 integration
59625d9
"""
Validation Test Script for Nova API v2 Jogging
This script validates that v2 jogging message formats work correctly with a real Nova instance
BEFORE migrating the nova_jogger.py module. This is Phase 0 and must pass before implementation.
Prerequisites:
- .env.local configured with Nova credentials
- Robot in safe position
- Robot controller accessible
Usage:
python scripts/validate_v2_jogging.py
"""
import json
import os
import sys
import time
from pathlib import Path
from typing import Any, Dict, Optional
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from websockets.sync.client import connect
except ImportError:
print("ERROR: websockets package required")
print("Install with: pip install websockets")
sys.exit(1)
# Load environment variables
def load_env_file(filepath: Path):
"""Load environment variables from .env file."""
if not filepath.exists():
return
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
env_local_path = Path(__file__).parent.parent / '.env.local'
load_env_file(env_local_path)
# Get configuration from environment
INSTANCE_URL = os.getenv('NOVA_INSTANCE_URL') or os.getenv('NOVA_API')
ACCESS_TOKEN = os.getenv('NOVA_ACCESS_TOKEN')
CELL_ID = os.getenv('NOVA_CELL_ID', 'cell')
CONTROLLER_ID = os.getenv('NOVA_CONTROLLER_ID')
MOTION_GROUP_ID = os.getenv('NOVA_MOTION_GROUP_ID')
TCP_NAME = os.getenv('NOVA_TCP_NAME', 'Flange')
# Validation results
test_results = []
def check_config():
"""Verify required configuration is present."""
print("=" * 70)
print("Nova API v2 Jogging Validation")
print("=" * 70)
print()
missing = []
if not INSTANCE_URL:
missing.append("NOVA_INSTANCE_URL (or NOVA_API)")
if not ACCESS_TOKEN:
missing.append("NOVA_ACCESS_TOKEN")
if not CONTROLLER_ID:
missing.append("NOVA_CONTROLLER_ID")
if not MOTION_GROUP_ID:
missing.append("NOVA_MOTION_GROUP_ID")
if missing:
print("ERROR: Missing required environment variables:")
for var in missing:
print(f" - {var}")
print()
print("Please configure .env.local with Nova credentials")
sys.exit(1)
print("Configuration:")
print(f" Instance URL: {INSTANCE_URL}")
print(f" Cell ID: {CELL_ID}")
print(f" Controller ID: {CONTROLLER_ID}")
print(f" Motion Group ID: {MOTION_GROUP_ID}")
print(f" TCP Name: {TCP_NAME}")
print()
def create_websocket_url() -> str:
"""Create the v2 jogging WebSocket URL."""
ws_url = INSTANCE_URL.replace("https://", "wss://").replace("http://", "ws://")
url = f"{ws_url}/api/v2/cells/{CELL_ID}/controllers/{CONTROLLER_ID}/execution/jogging"
# Add token as query parameter
url += f"?token={ACCESS_TOKEN}"
return url
def log_test(name: str, passed: bool, message: str = "", details: Any = None):
"""Log test result."""
status = "✓ PASS" if passed else "✗ FAIL"
print(f"[{status}] {name}")
if message:
print(f" {message}")
if details:
print(f" Details: {json.dumps(details, indent=8)}")
print()
test_results.append({
"test": name,
"passed": passed,
"message": message,
"details": details
})
def test_1_connect_and_initialize():
"""Test 1: Connect to WebSocket and send InitializeJoggingRequest."""
print("-" * 70)
print("Test 1: Connect and Initialize")
print("-" * 70)
try:
url = create_websocket_url()
print(f"Connecting to: {url[:80]}...")
ws = connect(url, open_timeout=10)
log_test("WebSocket Connection", True, "Successfully connected")
# Send InitializeJoggingRequest
init_msg = {
"message_type": "InitializeJoggingRequest",
"motion_group": MOTION_GROUP_ID,
"tcp": TCP_NAME,
"response_coordinate_system": "world"
}
print(f"Sending: {json.dumps(init_msg, indent=2)}")
ws.send(json.dumps(init_msg))
# Wait for response
print("Waiting for response...")
response = ws.recv(timeout=5.0)
response_data = json.loads(response)
print(f"Received: {json.dumps(response_data, indent=2)}")
# Check response (v2 API wraps result in {"result": {"kind": ...}})
result = response_data.get("result", response_data)
if "kind" in result:
if result["kind"] == "INITIALIZE_RECEIVED":
log_test("InitializeJoggingRequest", True, "Received INITIALIZE_RECEIVED", result)
ws.close()
return True, ws
else:
log_test("InitializeJoggingRequest", False, f"Unexpected kind: {result['kind']}", result)
ws.close()
return False, None
else:
log_test("InitializeJoggingRequest", False, "No 'kind' field in response", response_data)
ws.close()
return False, None
except Exception as e:
log_test("Test 1", False, f"Exception: {type(e).__name__}: {e}")
return False, None
def test_2_joint_velocity():
"""Test 2: Send JointVelocityRequest."""
print("-" * 70)
print("Test 2: Joint Velocity Command")
print("-" * 70)
try:
url = create_websocket_url()
ws = connect(url, open_timeout=10)
# Initialize first
init_msg = {
"message_type": "InitializeJoggingRequest",
"motion_group": MOTION_GROUP_ID,
"tcp": TCP_NAME
}
ws.send(json.dumps(init_msg))
response = ws.recv(timeout=5.0)
init_data = json.loads(response)
result = init_data.get('result', init_data)
print(f"Init response: {result.get('kind')}")
# Capture initial position
initial_pose = init_data.get('current_pose', {})
initial_joints = initial_pose.get('joint_position', {}).get('joints', [])
if initial_joints:
print(f"Initial joint 0 position: {initial_joints[0]:.4f} rad")
# Send joint velocity (small safe velocity on joint 0)
jog_msg = {
"message_type": "JointVelocityRequest",
"velocity": [0.05, 0, 0, 0, 0, 0] # Very small velocity for safety
}
print(f"Sending: {json.dumps(jog_msg, indent=2)}")
# Send a few times and track movement
moved = False
for i in range(5):
ws.send(json.dumps(jog_msg))
time.sleep(0.1)
# Check for state updates
try:
response = ws.recv(timeout=0.05)
state_data = json.loads(response)
if state_data.get('kind') == 'STATE_UPDATE':
current_joints = state_data.get('current_pose', {}).get('joint_position', {}).get('joints', [])
if current_joints and initial_joints:
delta = abs(current_joints[0] - initial_joints[0])
if delta > 0.001: # Movement threshold
moved = True
print(f" Robot moved! Joint 0 delta: {delta:.4f} rad")
except:
pass # No response yet, continue
# Stop with zeros
stop_msg = {
"message_type": "JointVelocityRequest",
"velocity": [0, 0, 0, 0, 0, 0]
}
ws.send(json.dumps(stop_msg))
# Wait a bit and check final position
time.sleep(0.2)
try:
while True:
response = ws.recv(timeout=0.1)
state_data = json.loads(response)
if state_data.get('kind') == 'STATE_UPDATE':
final_joints = state_data.get('current_pose', {}).get('joint_position', {}).get('joints', [])
if final_joints and initial_joints:
delta = abs(final_joints[0] - initial_joints[0])
if delta > 0.001:
moved = True
print(f"Final joint 0 position: {final_joints[0]:.4f} rad (moved {delta:.4f} rad)")
except:
pass
status_msg = "Joint velocity commands sent successfully"
if moved:
status_msg += " - ROBOT MOVED CONFIRMED"
else:
status_msg += " - WARNING: No movement detected"
log_test("JointVelocityRequest", True, status_msg)
ws.close()
return True
except Exception as e:
log_test("Test 2", False, f"Exception: {type(e).__name__}: {e}")
return False
def test_3_tcp_velocity():
"""Test 3: Send TcpVelocityRequest."""
print("-" * 70)
print("Test 3: TCP Velocity Command")
print("-" * 70)
try:
url = create_websocket_url()
ws = connect(url, open_timeout=10)
# Initialize
init_msg = {
"message_type": "InitializeJoggingRequest",
"motion_group": MOTION_GROUP_ID,
"tcp": TCP_NAME
}
ws.send(json.dumps(init_msg))
response = ws.recv(timeout=5.0)
init_data = json.loads(response)
result = init_data.get('result', init_data)
print(f"Init response: {result.get('kind')}")
# Send TCP velocity (small safe velocity in X)
tcp_msg = {
"message_type": "TcpVelocityRequest",
"translation": [5.0, 0.0, 0.0], # 5 mm/s in X
"rotation": [0.0, 0.0, 0.0],
"use_tool_coordinate_system": False
}
print(f"Sending: {json.dumps(tcp_msg, indent=2)}")
# Send a few times
for i in range(5):
ws.send(json.dumps(tcp_msg))
time.sleep(0.1)
# Stop with zeros
stop_msg = {
"message_type": "TcpVelocityRequest",
"translation": [0.0, 0.0, 0.0],
"rotation": [0.0, 0.0, 0.0],
"use_tool_coordinate_system": False
}
ws.send(json.dumps(stop_msg))
log_test("TcpVelocityRequest", True, "TCP velocity commands sent successfully")
ws.close()
return True
except Exception as e:
log_test("Test 3", False, f"Exception: {type(e).__name__}: {e}")
return False
def test_4_pause():
"""Test 4: Send PauseJoggingRequest."""
print("-" * 70)
print("Test 4: Pause Command")
print("-" * 70)
try:
url = create_websocket_url()
ws = connect(url, open_timeout=10)
# Initialize
init_msg = {
"message_type": "InitializeJoggingRequest",
"motion_group": MOTION_GROUP_ID,
"tcp": TCP_NAME
}
ws.send(json.dumps(init_msg))
response = ws.recv(timeout=5.0)
init_data = json.loads(response)
result = init_data.get('result', init_data)
print(f"Init response: {result.get('kind')}")
# Start movement
jog_msg = {
"message_type": "JointVelocityRequest",
"velocity": [0.05, 0, 0, 0, 0, 0]
}
ws.send(json.dumps(jog_msg))
time.sleep(0.2)
# Send pause
pause_msg = {"message_type": "PauseJoggingRequest"}
print(f"Sending: {json.dumps(pause_msg, indent=2)}")
ws.send(json.dumps(pause_msg))
log_test("PauseJoggingRequest", True, "Pause command sent successfully")
ws.close()
return True
except Exception as e:
log_test("Test 4", False, f"Exception: {type(e).__name__}: {e}")
return False
def test_5_continuous_sending():
"""Test 5: Continuous command sending at 10Hz."""
print("-" * 70)
print("Test 5: Continuous Sending (10Hz)")
print("-" * 70)
try:
url = create_websocket_url()
ws = connect(url, open_timeout=10)
# Initialize
init_msg = {
"message_type": "InitializeJoggingRequest",
"motion_group": MOTION_GROUP_ID,
"tcp": TCP_NAME
}
ws.send(json.dumps(init_msg))
response = ws.recv(timeout=5.0)
init_data = json.loads(response)
result = init_data.get('result', init_data)
print(f"Init response: {result.get('kind')}")
# Send 50 commands at 10Hz (5 seconds)
jog_msg = {
"message_type": "JointVelocityRequest",
"velocity": [0.03, 0, 0, 0, 0, 0]
}
print("Sending 50 commands at 10Hz (5 seconds)...")
start_time = time.time()
for i in range(50):
ws.send(json.dumps(jog_msg))
time.sleep(0.1)
if (i + 1) % 10 == 0:
print(f" Sent {i + 1}/50 commands")
duration = time.time() - start_time
print(f"Completed in {duration:.2f} seconds")
# Stop
stop_msg = {
"message_type": "JointVelocityRequest",
"velocity": [0, 0, 0, 0, 0, 0]
}
ws.send(json.dumps(stop_msg))
log_test("Continuous Sending", True, f"50 commands sent at 10Hz in {duration:.2f}s")
ws.close()
return True
except Exception as e:
log_test("Test 5", False, f"Exception: {type(e).__name__}: {e}")
return False
def get_robot_state():
"""Get current robot state from state stream."""
try:
import urllib.request
url = f"{INSTANCE_URL}/api/v2/cells/{CELL_ID}/controllers/{CONTROLLER_ID}/motion-groups/{MOTION_GROUP_ID}/state"
headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=5) as response:
data = json.loads(response.read())
return data
except Exception as e:
print(f"Failed to get robot state: {e}")
return None
def test_6_multi_axis_joint():
"""Test 6: Move two joints simultaneously and verify movement."""
print("-" * 70)
print("Test 6: Multi-Axis Joint Movement with Verification")
print("-" * 70)
try:
# Get initial state
initial_state = get_robot_state()
if not initial_state:
log_test("Multi-Axis Joint Movement", False, "Could not get initial robot state")
return False
initial_joints = initial_state.get('joint_position', [])
if not initial_joints or len(initial_joints) < 2:
log_test("Multi-Axis Joint Movement", False, "Invalid initial joint data")
return False
print(f"Initial joints [0,1]: [{initial_joints[0]:.4f}, {initial_joints[1]:.4f}] rad")
# Connect to jogging endpoint
url = create_websocket_url()
ws = connect(url, open_timeout=10)
# Initialize
init_msg = {
"message_type": "InitializeJoggingRequest",
"motion_group": MOTION_GROUP_ID,
"tcp": TCP_NAME
}
ws.send(json.dumps(init_msg))
response = ws.recv(timeout=5.0)
init_data = json.loads(response)
result = init_data.get('result', init_data)
print(f"Init response: {result.get('kind')}")
# Send multi-axis joint velocity (move joints 0 and 1 simultaneously)
jog_msg = {
"message_type": "JointVelocityRequest",
"velocity": [0.05, 0.05, 0, 0, 0, 0] # Move joints 0 and 1
}
print(f"Sending: {json.dumps(jog_msg, indent=2)}")
# Send commands for 1 second
print("Jogging for 1 second...")
for i in range(10):
ws.send(json.dumps(jog_msg))
time.sleep(0.1)
# Stop
stop_msg = {
"message_type": "JointVelocityRequest",
"velocity": [0, 0, 0, 0, 0, 0]
}
ws.send(json.dumps(stop_msg))
ws.close()
# Wait for motion to complete
time.sleep(0.5)
# Get final state
final_state = get_robot_state()
if not final_state:
log_test("Multi-Axis Joint Movement", False, "Could not get final robot state")
return False
final_joints = final_state.get('joint_position', [])
if not final_joints or len(final_joints) < 2:
log_test("Multi-Axis Joint Movement", False, "Invalid final joint data")
return False
print(f"Final joints [0,1]: [{final_joints[0]:.4f}, {final_joints[1]:.4f}] rad")
# Calculate deltas
delta_j0 = abs(final_joints[0] - initial_joints[0])
delta_j1 = abs(final_joints[1] - initial_joints[1])
print(f"Delta joint 0: {delta_j0:.4f} rad")
print(f"Delta joint 1: {delta_j1:.4f} rad")
# Verify both joints moved
movement_threshold = 0.001 # rad
j0_moved = delta_j0 > movement_threshold
j1_moved = delta_j1 > movement_threshold
if j0_moved and j1_moved:
log_test("Multi-Axis Joint Movement", True,
f"Both joints moved as expected (Δj0={delta_j0:.4f}, Δj1={delta_j1:.4f})",
{"delta_joint_0": delta_j0, "delta_joint_1": delta_j1})
elif j0_moved or j1_moved:
log_test("Multi-Axis Joint Movement", False,
f"Only one joint moved (Δj0={delta_j0:.4f}, Δj1={delta_j1:.4f})",
{"delta_joint_0": delta_j0, "delta_joint_1": delta_j1})
else:
log_test("Multi-Axis Joint Movement", False,
"No movement detected in either joint",
{"delta_joint_0": delta_j0, "delta_joint_1": delta_j1})
return j0_moved and j1_moved
except Exception as e:
log_test("Test 6", False, f"Exception: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
return False
def test_7_mode_switching():
"""Test 6: Dynamic mode switching between joint and TCP."""
print("-" * 70)
print("Test 6: Mode Switching (Joint ↔ TCP)")
print("-" * 70)
try:
url = create_websocket_url()
ws = connect(url, open_timeout=10)
# Initialize
init_msg = {
"message_type": "InitializeJoggingRequest",
"motion_group": MOTION_GROUP_ID,
"tcp": TCP_NAME
}
ws.send(json.dumps(init_msg))
response = ws.recv(timeout=5.0)
init_data = json.loads(response)
result = init_data.get('result', init_data)
print(f"Init response: {result.get('kind')}")
# Start with joint jogging
print("Phase 1: Joint jogging...")
joint_msg = {
"message_type": "JointVelocityRequest",
"velocity": [0.05, 0, 0, 0, 0, 0]
}
for i in range(10):
ws.send(json.dumps(joint_msg))
time.sleep(0.1)
# Switch to TCP jogging
print("Phase 2: Switching to TCP jogging...")
tcp_msg = {
"message_type": "TcpVelocityRequest",
"translation": [5.0, 0.0, 0.0],
"rotation": [0.0, 0.0, 0.0],
"use_tool_coordinate_system": False
}
for i in range(10):
ws.send(json.dumps(tcp_msg))
time.sleep(0.1)
# Switch back to joint
print("Phase 3: Switching back to joint jogging...")
joint_msg2 = {
"message_type": "JointVelocityRequest",
"velocity": [0, 0, 0, 0, 0, 0.05] # Different joint
}
for i in range(10):
ws.send(json.dumps(joint_msg2))
time.sleep(0.1)
# Stop
stop_msg = {
"message_type": "JointVelocityRequest",
"velocity": [0, 0, 0, 0, 0, 0]
}
ws.send(json.dumps(stop_msg))
log_test("Mode Switching", True, "Successfully switched between joint and TCP modes")
ws.close()
return True
except Exception as e:
log_test("Test 6", False, f"Exception: {type(e).__name__}: {e}")
return False
def print_summary():
"""Print test summary."""
print()
print("=" * 70)
print("VALIDATION SUMMARY")
print("=" * 70)
print()
passed = sum(1 for r in test_results if r["passed"])
total = len(test_results)
print(f"Tests passed: {passed}/{total}")
print()
if passed == total:
print("✓ ALL TESTS PASSED")
print()
print("The v2 API message formats have been validated.")
print("You can now proceed with Phase 1 (implementation).")
else:
print("✗ SOME TESTS FAILED")
print()
print("Please review the failures above before proceeding.")
print("Do NOT proceed to Phase 1 until all tests pass!")
print()
print("=" * 70)
# Write results to file
results_file = Path(__file__).parent / "v2_jogging_validation_results.json"
with open(results_file, 'w') as f:
json.dump({
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"passed": passed,
"total": total,
"all_passed": passed == total,
"results": test_results
}, f, indent=2)
print(f"Results saved to: {results_file}")
def main():
"""Run all validation tests."""
import sys
check_config()
# Check for --auto flag to skip confirmation
auto_mode = "--auto" in sys.argv
if not auto_mode:
print("WARNING: This will move the robot!")
print("Ensure the robot is in a safe position before proceeding.")
print()
response = input("Continue? [y/N]: ")
if response.lower() != 'y':
print("Aborted.")
return
else:
print("Running in AUTO mode (skipping safety confirmation)")
print()
# Run tests
test_1_connect_and_initialize()
test_2_joint_velocity()
test_3_tcp_velocity()
test_4_pause()
test_5_continuous_sending()
test_6_multi_axis_joint()
test_7_mode_switching()
# Print summary
print_summary()
if __name__ == "__main__":
main()