nova-sim / scripts /test_state_stream.py
Georg
Implement Nova API jogging support and enhance UI controls
a75f112
#!/usr/bin/env python3
"""Test Nova API state stream WebSocket connection."""
import os
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Simple .env loader
def load_env_file(filepath):
"""Simple .env file loader."""
if not filepath.exists():
return {}
env_vars = {}
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
env_vars[key.strip()] = value.strip()
os.environ[key.strip()] = value.strip()
return env_vars
def main():
# Load .env.local
env_path = Path(__file__).parent.parent / ".env.local"
if not env_path.exists():
print(f"Error: {env_path} not found")
return 1
print(f"Loading environment from {env_path}")
env_vars = load_env_file(env_path)
# Try to import websockets
try:
from websockets.sync.client import connect
print("✓ websockets module imported successfully")
except ImportError as e:
print(f"✗ Failed to import websockets: {e}")
print("Install with: pip install websockets")
return 1
# Get config
instance_url = os.getenv("NOVA_INSTANCE_URL")
access_token = os.getenv("NOVA_ACCESS_TOKEN")
cell_id = os.getenv("NOVA_CELL_ID", "cell")
controller_id = os.getenv("NOVA_CONTROLLER_ID")
motion_group_id = os.getenv("NOVA_MOTION_GROUP_ID")
response_rate = os.getenv("NOVA_RESPONSE_RATE_MS", "200")
if not all([instance_url, access_token, controller_id, motion_group_id]):
print("✗ Missing required environment variables")
return 1
# Convert HTTP to WebSocket URL
ws_url = instance_url.replace("https://", "wss://").replace("http://", "ws://")
url = (
f"{ws_url}/api/v2/cells/{cell_id}/controllers/"
f"{controller_id}/motion-groups/{motion_group_id}/state-stream"
f"?response_rate={response_rate}"
)
print(f"\nTesting WebSocket state stream connection...")
print(f"URL: {url}")
print()
# Try to connect
headers = [("Authorization", f"Bearer {access_token}")]
try:
print("Connecting...")
ws = connect(url, open_timeout=10, additional_headers=headers)
print("✓ WebSocket connected successfully!")
print("\nReceiving messages (will stop after 5 messages)...")
count = 0
for message in ws:
count += 1
print(f"Message {count}: {len(message)} bytes")
if count >= 5:
break
ws.close()
print("\n✓ State stream test successful!")
return 0
except Exception as e:
print(f"✗ Connection failed: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())