#!/usr/bin/env python3 """ Startup script to run both Whisper HTTP service and MCP service """ import asyncio import subprocess import sys import time import signal import os from pathlib import Path class ServiceManager: def __init__(self): self.whisper_process = None self.mcp_process = None self.running = False async def start_whisper_service(self): """Start the Whisper HTTP service""" print("๐Ÿ”„ Starting Whisper HTTP service...") # Check if whisper_http_wrapper.py exists if not Path("whisper_http_wrapper.py").exists(): print("โŒ whisper_http_wrapper.py not found in current directory") return False try: self.whisper_process = subprocess.Popen([ sys.executable, "whisper_http_wrapper.py" ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # Wait a bit for the service to start await asyncio.sleep(3) if self.whisper_process.poll() is None: print("โœ… Whisper HTTP service started successfully") return True else: stdout, stderr = self.whisper_process.communicate() print(f"โŒ Whisper HTTP service failed to start:") print(f"STDOUT: {stdout}") print(f"STDERR: {stderr}") return False except Exception as e: print(f"โŒ Error starting Whisper HTTP service: {e}") return False async def start_mcp_service(self): """Start the MCP service""" print("๐Ÿ”„ Starting MCP Speech service...") # Check if mcp_speech_service.py exists if not Path("mcp_speech_service.py").exists(): print("โŒ mcp_speech_service.py not found in current directory") return False try: self.mcp_process = subprocess.Popen([ sys.executable, "mcp_speech_service.py" ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # Wait a bit for the service to start await asyncio.sleep(2) if self.mcp_process.poll() is None: print("โœ… MCP Speech service started successfully") return True else: stdout, stderr = self.mc_process.communicate() print(f"โŒ MCP Speech service failed to start:") print(f"STDOUT: {stdout}") print(f"STDERR: {stderr}") return False except Exception as e: print(f"โŒ Error starting MCP Speech service: {e}") return False async def check_services(self): """Check if services are still running""" import aiohttp # Check Whisper service whisper_ok = False try: async with aiohttp.ClientSession() as session: async with session.get("http://localhost:8000/health", timeout=5) as response: if response.status == 200: whisper_ok = True except: pass # Check MCP service mcp_ok = False try: async with aiohttp.ClientSession() as session: async with session.get("http://localhost:8081/status", timeout=5) as response: if response.status == 200: mcp_ok = True except: pass return whisper_ok, mcp_ok def stop_services(self): """Stop all services""" print("\n๐Ÿ›‘ Stopping services...") if self.mcp_process and self.mcp_process.poll() is None: print(" Stopping MCP service...") self.mcp_process.terminate() try: self.mcp_process.wait(timeout=5) except subprocess.TimeoutExpired: self.mcp_process.kill() if self.whisper_process and self.whisper_process.poll() is None: print(" Stopping Whisper service...") self.whisper_process.terminate() try: self.whisper_process.wait(timeout=5) except subprocess.TimeoutExpired: self.whisper_process.kill() print("โœ… Services stopped") def signal_handler(self, signum, frame): """Handle Ctrl+C""" print(f"\n๐Ÿ“ก Received signal {signum}") self.running = False self.stop_services() sys.exit(0) async def run(self): """Run both services""" # Set up signal handlers signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) print("๐Ÿš€ Starting MCP Speech-to-Text Services") print("=" * 50) # Start Whisper service first if not await self.start_whisper_service(): print("โŒ Failed to start Whisper service. Exiting.") return # Start MCP service if not await self.start_mcp_service(): print("โŒ Failed to start MCP service. Stopping Whisper service.") self.stop_services() return print("\nโœ… Both services are running!") print("๐Ÿ“‹ Service URLs:") print(" โ€ข Whisper HTTP: http://localhost:8000") print(" โ€ข MCP Service: http://localhost:8081") print("\n๐Ÿงช You can now run: python testClient.py") print("โน๏ธ Press Ctrl+C to stop all services") self.running = True # Monitor services try: while self.running: await asyncio.sleep(10) # Check every 10 seconds whisper_ok, mcp_ok = await self.check_services() if not whisper_ok: print("โš ๏ธ Whisper service appears to be down") if not mcp_ok: print("โš ๏ธ MCP service appears to be down") if not whisper_ok or not mcp_ok: print("๐Ÿ”„ Attempting to restart services...") self.stop_services() await asyncio.sleep(2) if not whisper_ok: await self.start_whisper_service() if not mcp_ok: await self.start_mcp_service() except KeyboardInterrupt: pass finally: self.stop_services() async def main(): """Main function""" if len(sys.argv) > 1 and sys.argv[1] == "--help": print("MCP Speech-to-Text Service Manager") print("Usage: python startup.py") print("\nThis script will:") print("1. Start the Whisper HTTP service on port 8000") print("2. Start the MCP Speech service on port 8081") print("3. Monitor both services and restart if needed") print("4. Stop all services when you press Ctrl+C") return manager = ServiceManager() await manager.run() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n๐Ÿ‘‹ Goodbye!")