Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| WebSocket Test Client β Monitor what the hub is broadcasting | |
| No external dependencies beyond websockets (already installed) | |
| Usage: | |
| python3 test_websocket.py [--subscribe|--publish] [--space ASSET_NAME] | |
| """ | |
| import asyncio | |
| import json | |
| import sys | |
| import websockets | |
| from datetime import datetime | |
| async def test_subscribe(hub_url: str): | |
| """Listen to what the hub is broadcasting.""" | |
| print(f"[*] Connecting to hub subscriber at {hub_url}/ws/subscribe") | |
| try: | |
| async with websockets.connect(f"{hub_url}/ws/subscribe") as ws: | |
| print(f"[β] Connected! Listening for metrics updates...\n") | |
| count = 0 | |
| while True: | |
| try: | |
| msg = await asyncio.wait_for(ws.recv(), timeout=10.0) | |
| count += 1 | |
| data = json.loads(msg) | |
| ts = datetime.now().strftime("%H:%M:%S") | |
| print(f"[{ts}] Message #{count}:") | |
| print(f" {json.dumps(data, indent=2)}\n") | |
| except asyncio.TimeoutError: | |
| print("[!] No messages received for 10 seconds...") | |
| print(" β Asset spaces may not be connected yet") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"[β] Connection failed: {e}") | |
| print(f" Make sure hub is running and accessible at {hub_url}") | |
| sys.exit(1) | |
| async def test_publish(hub_url: str, space_name: str): | |
| """Send a test metric to the hub.""" | |
| print(f"[*] Connecting to hub publisher for space: {space_name}") | |
| test_message = { | |
| "training": { | |
| "training_steps": 9999, | |
| "actor_loss": 0.123, | |
| "critic_loss": 0.456, | |
| "avn_loss": 0.789, | |
| "avn_accuracy": 0.95, | |
| }, | |
| "voting": { | |
| "dominant_signal": "BUY", | |
| "buy_count": 42, | |
| "sell_count": 18, | |
| } | |
| } | |
| try: | |
| uri = f"{hub_url}/ws/publish/{space_name}" | |
| print(f"[*] Connecting to {uri}") | |
| async with websockets.connect(uri) as ws: | |
| print(f"[β] Connected! Sending test message...") | |
| await ws.send(json.dumps(test_message)) | |
| print(f"[β] Sent:\n{json.dumps(test_message, indent=2)}") | |
| # Keep connection open for 5 seconds | |
| print(f"[*] Keeping connection open for 5 seconds...") | |
| await asyncio.sleep(5) | |
| print(f"[β] Done!") | |
| except Exception as e: | |
| print(f"[β] Error: {e}") | |
| sys.exit(1) | |
| async def main(): | |
| # Default hub URL (adjust if needed) | |
| hub_url = "ws://127.0.0.1:7860" | |
| if len(sys.argv) > 1: | |
| if "--subscribe" in sys.argv: | |
| print("=" * 60) | |
| print("QUASAR Hub WebSocket Monitor (Subscribe Mode)") | |
| print("=" * 60) | |
| await test_subscribe(hub_url) | |
| elif "--publish" in sys.argv: | |
| space_name = "TEST_ASSET" | |
| if "--space" in sys.argv: | |
| idx = sys.argv.index("--space") | |
| if idx + 1 < len(sys.argv): | |
| space_name = sys.argv[idx + 1] | |
| print("=" * 60) | |
| print(f"QUASAR Hub WebSocket Test (Publish Mode)") | |
| print("=" * 60) | |
| await test_publish(hub_url, space_name) | |
| else: | |
| print_usage() | |
| else: | |
| print_usage() | |
| def print_usage(): | |
| print(""" | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β QUASAR WebSocket Test Tool v1.0 β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| USAGE: | |
| # Monitor what hub is broadcasting (metrics from all spaces) | |
| python3 test_websocket.py --subscribe | |
| # Send a test metric to hub (publish as a space) | |
| python3 test_websocket.py --publish --space TEST_ASSET | |
| # Send test metrics with a different space name | |
| python3 test_websocket.py --publish --space V100_1h | |
| EXAMPLE WORKFLOW: | |
| Terminal 1 (Monitor hub): | |
| $ python3 test_websocket.py --subscribe | |
| [β] Connected! Listening for metrics updates... | |
| Terminal 2 (Send test data): | |
| $ python3 test_websocket.py --publish --space V100_1h | |
| [β] Connected! Sending test message... | |
| [β] Sent: | |
| { | |
| "training": {...}, | |
| "voting": {...} | |
| } | |
| Terminal 1 (should see the message): | |
| [12:34:56] Message #1: | |
| { | |
| "space_name": "V100_1h", | |
| "training": {...}, | |
| "voting": {...} | |
| } | |
| TROUBLESHOOTING: | |
| "Connection refused" β Hub not running on port 7860 | |
| $ curl http://127.0.0.1:7860/api/health | |
| No messages on subscribe β Asset spaces not connected | |
| Check if asset spaces are running and sending data | |
| "Module not found: websockets" β Install it | |
| $ pip install websockets | |
| """) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |