| import asyncio |
| import json |
| import websockets |
| from typing import Dict, Any, Set |
| import subprocess |
| import shlex |
| from queue import Queue |
| import threading |
| import time |
| import logging |
| from datetime import datetime |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class TerminalStreamManager: |
| """Manages real-time terminal streaming with WebSocket connections.""" |
| |
| def __init__(self): |
| self.clients: Set[websockets.WebSocketServerProtocol] = set() |
| self.command_queue = Queue() |
| self.is_running = False |
| self.current_process = None |
| self.server = None |
| self.server_thread = None |
| self.loop = None |
| |
| async def register_client(self, websocket): |
| """Register a new WebSocket client.""" |
| self.clients.add(websocket) |
| await websocket.send(json.dumps({ |
| 'type': 'connected', |
| 'message': '🚀 Terminal connected successfully', |
| 'timestamp': datetime.now().isoformat() |
| })) |
| logger.info(f"Terminal client connected. Total clients: {len(self.clients)}") |
| |
| async def unregister_client(self, websocket): |
| """Unregister a WebSocket client.""" |
| self.clients.discard(websocket) |
| logger.info(f"Terminal client disconnected. Total clients: {len(self.clients)}") |
| |
| async def broadcast(self, message: Dict[str, Any]): |
| """Broadcast message to all connected clients.""" |
| if self.clients: |
| disconnected = set() |
| message['timestamp'] = datetime.now().isoformat() |
| |
| for client in self.clients: |
| try: |
| await client.send(json.dumps(message)) |
| except websockets.exceptions.ConnectionClosed: |
| disconnected.add(client) |
| except Exception as e: |
| logger.error(f"Error broadcasting to client: {e}") |
| disconnected.add(client) |
| |
| |
| for client in disconnected: |
| self.clients.discard(client) |
| |
| async def execute_command(self, command: str): |
| """Execute a command and stream output in real-time.""" |
| await self.broadcast({ |
| 'type': 'command_start', |
| 'command': command, |
| 'message': f'$ {command}' |
| }) |
| |
| try: |
| |
| safe_command = shlex.split(command) |
| |
| self.current_process = subprocess.Popen( |
| safe_command, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| bufsize=1, |
| universal_newlines=True |
| ) |
| |
| |
| while True: |
| |
| if self.current_process.poll() is not None: |
| |
| remaining_stdout = self.current_process.stdout.read() |
| remaining_stderr = self.current_process.stderr.read() |
| |
| if remaining_stdout: |
| await self.broadcast({ |
| 'type': 'output', |
| 'data': remaining_stdout, |
| 'stream': 'stdout' |
| }) |
| |
| if remaining_stderr: |
| await self.broadcast({ |
| 'type': 'output', |
| 'data': remaining_stderr, |
| 'stream': 'stderr' |
| }) |
| |
| break |
| |
| |
| try: |
| |
| import select |
| ready, _, _ = select.select([self.current_process.stdout, self.current_process.stderr], [], [], 0.1) |
| |
| for stream in ready: |
| if stream == self.current_process.stdout: |
| line = stream.readline() |
| if line: |
| await self.broadcast({ |
| 'type': 'output', |
| 'data': line, |
| 'stream': 'stdout' |
| }) |
| elif stream == self.current_process.stderr: |
| line = stream.readline() |
| if line: |
| await self.broadcast({ |
| 'type': 'output', |
| 'data': line, |
| 'stream': 'stderr' |
| }) |
| except: |
| |
| await asyncio.sleep(0.1) |
| |
| |
| await self.broadcast({ |
| 'type': 'command_complete', |
| 'exit_code': self.current_process.returncode, |
| 'message': f'Process exited with code {self.current_process.returncode}' |
| }) |
| |
| except Exception as e: |
| await self.broadcast({ |
| 'type': 'error', |
| 'data': str(e), |
| 'stream': 'system' |
| }) |
| finally: |
| self.current_process = None |
| |
| async def handle_client(self, websocket, path): |
| """Handle WebSocket client connections.""" |
| await self.register_client(websocket) |
| try: |
| async for message in websocket: |
| try: |
| data = json.loads(message) |
| |
| if data.get('type') == 'command': |
| command = data.get('command', '').strip() |
| if command: |
| await self.execute_command(command) |
| |
| elif data.get('type') == 'interrupt': |
| if self.current_process: |
| self.current_process.terminate() |
| await self.broadcast({ |
| 'type': 'interrupted', |
| 'message': 'Process interrupted by user' |
| }) |
| |
| except json.JSONDecodeError: |
| await websocket.send(json.dumps({ |
| 'type': 'error', |
| 'message': 'Invalid JSON message' |
| })) |
| |
| except websockets.exceptions.ConnectionClosed: |
| pass |
| finally: |
| await self.unregister_client(websocket) |
| |
| def stop_server(self): |
| """Stop the WebSocket server gracefully.""" |
| if self.server: |
| logger.info("Stopping terminal WebSocket server...") |
| self.is_running = False |
| |
| |
| if self.clients: |
| import asyncio |
| try: |
| loop = asyncio.get_event_loop() |
| for client in self.clients.copy(): |
| try: |
| loop.create_task(client.close()) |
| except Exception as e: |
| logger.warning(f"Error closing client connection: {e}") |
| self.clients.clear() |
| except Exception as e: |
| logger.warning(f"Error closing client connections: {e}") |
| |
| |
| if self.current_process: |
| try: |
| self.current_process.terminate() |
| self.current_process = None |
| except Exception as e: |
| logger.warning(f"Error terminating process: {e}") |
| |
| |
| try: |
| if hasattr(self.server, 'close'): |
| self.server.close() |
| |
| |
| if self.loop and self.loop.is_running(): |
| self.loop.call_soon_threadsafe(self.loop.stop) |
| |
| logger.info("Terminal WebSocket server stopped") |
| except Exception as e: |
| logger.error(f"Error stopping WebSocket server: {e}") |
| else: |
| logger.info("Terminal WebSocket server was not running") |
|
|
| |
| terminal_manager = TerminalStreamManager() |
|
|
| async def start_websocket_server(host='localhost', port=8765): |
| """Start the WebSocket server for terminal streaming.""" |
| logger.info(f"Starting terminal WebSocket server on {host}:{port}") |
| |
| async def handler(websocket, path): |
| await terminal_manager.handle_client(websocket, path) |
| |
| server = await websockets.serve(handler, host, port) |
| terminal_manager.server = server |
| terminal_manager.is_running = True |
| return server |
|
|
| def run_websocket_server(): |
| """Run WebSocket server in a separate thread.""" |
| def start_server(): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| terminal_manager.loop = loop |
| |
| try: |
| server = loop.run_until_complete(start_websocket_server()) |
| logger.info("Terminal WebSocket server started successfully") |
| loop.run_forever() |
| except Exception as e: |
| logger.error(f"Error starting WebSocket server: {e}") |
| finally: |
| logger.info("WebSocket server loop ended") |
| |
| thread = threading.Thread(target=start_server, daemon=True) |
| terminal_manager.server_thread = thread |
| thread.start() |
| return thread |