docker-speech2text / startup_script.py
petergits
2nd checking push speech2text
c08e928
#!/usr/bin/env python3
"""
Startup script to run both Whisper HTTP service and MCP service
"""
import asyncio
import subprocess
import sys
import time
import signal
import os
from pathlib import Path
class ServiceManager:
def __init__(self):
self.whisper_process = None
self.mcp_process = None
self.running = False
async def start_whisper_service(self):
"""Start the Whisper HTTP service"""
print("πŸ”„ Starting Whisper HTTP service...")
# Check if whisper_http_wrapper.py exists
if not Path("whisper_http_wrapper.py").exists():
print("❌ whisper_http_wrapper.py not found in current directory")
return False
try:
self.whisper_process = subprocess.Popen([
sys.executable, "whisper_http_wrapper.py"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Wait a bit for the service to start
await asyncio.sleep(3)
if self.whisper_process.poll() is None:
print("βœ… Whisper HTTP service started successfully")
return True
else:
stdout, stderr = self.whisper_process.communicate()
print(f"❌ Whisper HTTP service failed to start:")
print(f"STDOUT: {stdout}")
print(f"STDERR: {stderr}")
return False
except Exception as e:
print(f"❌ Error starting Whisper HTTP service: {e}")
return False
async def start_mcp_service(self):
"""Start the MCP service"""
print("πŸ”„ Starting MCP Speech service...")
# Check if mcp_speech_service.py exists
if not Path("mcp_speech_service.py").exists():
print("❌ mcp_speech_service.py not found in current directory")
return False
try:
self.mcp_process = subprocess.Popen([
sys.executable, "mcp_speech_service.py"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Wait a bit for the service to start
await asyncio.sleep(2)
if self.mcp_process.poll() is None:
print("βœ… MCP Speech service started successfully")
return True
else:
stdout, stderr = self.mc_process.communicate()
print(f"❌ MCP Speech service failed to start:")
print(f"STDOUT: {stdout}")
print(f"STDERR: {stderr}")
return False
except Exception as e:
print(f"❌ Error starting MCP Speech service: {e}")
return False
async def check_services(self):
"""Check if services are still running"""
import aiohttp
# Check Whisper service
whisper_ok = False
try:
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:8000/health", timeout=5) as response:
if response.status == 200:
whisper_ok = True
except:
pass
# Check MCP service
mcp_ok = False
try:
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:8081/status", timeout=5) as response:
if response.status == 200:
mcp_ok = True
except:
pass
return whisper_ok, mcp_ok
def stop_services(self):
"""Stop all services"""
print("\nπŸ›‘ Stopping services...")
if self.mcp_process and self.mcp_process.poll() is None:
print(" Stopping MCP service...")
self.mcp_process.terminate()
try:
self.mcp_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.mcp_process.kill()
if self.whisper_process and self.whisper_process.poll() is None:
print(" Stopping Whisper service...")
self.whisper_process.terminate()
try:
self.whisper_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.whisper_process.kill()
print("βœ… Services stopped")
def signal_handler(self, signum, frame):
"""Handle Ctrl+C"""
print(f"\nπŸ“‘ Received signal {signum}")
self.running = False
self.stop_services()
sys.exit(0)
async def run(self):
"""Run both services"""
# Set up signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
print("πŸš€ Starting MCP Speech-to-Text Services")
print("=" * 50)
# Start Whisper service first
if not await self.start_whisper_service():
print("❌ Failed to start Whisper service. Exiting.")
return
# Start MCP service
if not await self.start_mcp_service():
print("❌ Failed to start MCP service. Stopping Whisper service.")
self.stop_services()
return
print("\nβœ… Both services are running!")
print("πŸ“‹ Service URLs:")
print(" β€’ Whisper HTTP: http://localhost:8000")
print(" β€’ MCP Service: http://localhost:8081")
print("\nπŸ§ͺ You can now run: python testClient.py")
print("⏹️ Press Ctrl+C to stop all services")
self.running = True
# Monitor services
try:
while self.running:
await asyncio.sleep(10) # Check every 10 seconds
whisper_ok, mcp_ok = await self.check_services()
if not whisper_ok:
print("⚠️ Whisper service appears to be down")
if not mcp_ok:
print("⚠️ MCP service appears to be down")
if not whisper_ok or not mcp_ok:
print("πŸ”„ Attempting to restart services...")
self.stop_services()
await asyncio.sleep(2)
if not whisper_ok:
await self.start_whisper_service()
if not mcp_ok:
await self.start_mcp_service()
except KeyboardInterrupt:
pass
finally:
self.stop_services()
async def main():
"""Main function"""
if len(sys.argv) > 1 and sys.argv[1] == "--help":
print("MCP Speech-to-Text Service Manager")
print("Usage: python startup.py")
print("\nThis script will:")
print("1. Start the Whisper HTTP service on port 8000")
print("2. Start the MCP Speech service on port 8081")
print("3. Monitor both services and restart if needed")
print("4. Stop all services when you press Ctrl+C")
return
manager = ServiceManager()
await manager.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nπŸ‘‹ Goodbye!")