| | |
| | """Test Nova API state stream WebSocket connection.""" |
| |
|
| | import os |
| | import sys |
| | from pathlib import Path |
| |
|
| | |
| | sys.path.insert(0, str(Path(__file__).parent.parent)) |
| |
|
| | |
| | def load_env_file(filepath): |
| | """Simple .env file loader.""" |
| | if not filepath.exists(): |
| | return {} |
| | env_vars = {} |
| | with open(filepath, 'r') as f: |
| | for line in f: |
| | line = line.strip() |
| | if line and not line.startswith('#') and '=' in line: |
| | key, value = line.split('=', 1) |
| | env_vars[key.strip()] = value.strip() |
| | os.environ[key.strip()] = value.strip() |
| | return env_vars |
| |
|
| |
|
| | def main(): |
| | |
| | env_path = Path(__file__).parent.parent / ".env.local" |
| | if not env_path.exists(): |
| | print(f"Error: {env_path} not found") |
| | return 1 |
| |
|
| | print(f"Loading environment from {env_path}") |
| | env_vars = load_env_file(env_path) |
| |
|
| | |
| | try: |
| | from websockets.sync.client import connect |
| | print("✓ websockets module imported successfully") |
| | except ImportError as e: |
| | print(f"✗ Failed to import websockets: {e}") |
| | print("Install with: pip install websockets") |
| | return 1 |
| |
|
| | |
| | instance_url = os.getenv("NOVA_INSTANCE_URL") |
| | access_token = os.getenv("NOVA_ACCESS_TOKEN") |
| | cell_id = os.getenv("NOVA_CELL_ID", "cell") |
| | controller_id = os.getenv("NOVA_CONTROLLER_ID") |
| | motion_group_id = os.getenv("NOVA_MOTION_GROUP_ID") |
| | response_rate = os.getenv("NOVA_RESPONSE_RATE_MS", "200") |
| |
|
| | if not all([instance_url, access_token, controller_id, motion_group_id]): |
| | print("✗ Missing required environment variables") |
| | return 1 |
| |
|
| | |
| | ws_url = instance_url.replace("https://", "wss://").replace("http://", "ws://") |
| | url = ( |
| | f"{ws_url}/api/v2/cells/{cell_id}/controllers/" |
| | f"{controller_id}/motion-groups/{motion_group_id}/state-stream" |
| | f"?response_rate={response_rate}" |
| | ) |
| |
|
| | print(f"\nTesting WebSocket state stream connection...") |
| | print(f"URL: {url}") |
| | print() |
| |
|
| | |
| | headers = [("Authorization", f"Bearer {access_token}")] |
| | try: |
| | print("Connecting...") |
| | ws = connect(url, open_timeout=10, additional_headers=headers) |
| | print("✓ WebSocket connected successfully!") |
| |
|
| | print("\nReceiving messages (will stop after 5 messages)...") |
| | count = 0 |
| | for message in ws: |
| | count += 1 |
| | print(f"Message {count}: {len(message)} bytes") |
| | if count >= 5: |
| | break |
| |
|
| | ws.close() |
| | print("\n✓ State stream test successful!") |
| | return 0 |
| |
|
| | except Exception as e: |
| | print(f"✗ Connection failed: {type(e).__name__}: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | return 1 |
| |
|
| |
|
| | if __name__ == "__main__": |
| | sys.exit(main()) |
| |
|