| """ |
| Main Pipeline Orchestrator for Multilingual Audio Intelligence System |
| |
| This module provides the complete end-to-end pipeline orchestration, |
| integrating audio preprocessing, speaker diarization, speech recognition, |
| neural machine translation, and output formatting into a unified system. |
| |
| Key Features: |
| - Complete end-to-end pipeline execution |
| - Performance monitoring and benchmarking |
| - Robust error handling and recovery |
| - Progress tracking for long operations |
| - Multiple output format generation |
| - Command-line interface for batch processing |
| - Integration with all system modules |
| |
| Usage: |
| python main.py input_audio.wav --output-dir results/ |
| python main.py audio.mp3 --format json --translate-to en |
| python main.py --benchmark test_audio/ --verbose |
| |
| Dependencies: All src modules, argparse, logging |
| """ |
|
|
| import os |
| import sys |
| import logging |
| import argparse |
| import time |
| from pathlib import Path |
| from typing import Union, Dict, List, Optional, Any |
| import json |
|
|
| |
| current_dir = os.path.dirname(__file__) |
| sys.path.insert(0, current_dir) |
|
|
| |
| from audio_processor import AudioProcessor |
| from speaker_diarizer import SpeakerDiarizer, SpeakerSegment |
| from speech_recognizer import SpeechRecognizer, TranscriptionSegment |
| from translator import NeuralTranslator, TranslationResult |
| from output_formatter import OutputFormatter, ProcessedSegment |
| from speaker_verifier import SpeakerVerifier |
| from noise_reduction import NoiseReducer |
| from utils import ( |
| performance_monitor, ProgressTracker, validate_audio_file, |
| get_system_info, format_duration, ensure_directory, get_file_info, |
| safe_filename |
| ) |
| from quality_control import quality_controller |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AudioIntelligencePipeline: |
| """ |
| Complete multilingual audio intelligence pipeline. |
| |
| Orchestrates the entire workflow from raw audio input to structured, |
| multilingual output with speaker attribution and translations. |
| """ |
| |
| def __init__(self, |
| whisper_model_size: str = "small", |
| target_language: str = "en", |
| device: Optional[str] = None, |
| hf_token: Optional[str] = None, |
| output_dir: Optional[str] = None): |
| """ |
| Initialize the complete audio intelligence pipeline. |
| |
| Args: |
| whisper_model_size (str): Whisper model size for ASR |
| target_language (str): Target language for translation |
| device (str, optional): Device to run on ('cpu', 'cuda', 'auto') |
| hf_token (str, optional): Hugging Face token for gated models |
| output_dir (str, optional): Directory for output files |
| """ |
| self.whisper_model_size = whisper_model_size |
| self.target_language = target_language |
| self.device = device |
| self.hf_token = hf_token |
| self.output_dir = Path(output_dir) if output_dir else Path("./results") |
| |
| |
| ensure_directory(self.output_dir) |
| |
| |
| self.audio_processor = None |
| self.speaker_diarizer = None |
| self.speech_recognizer = None |
| self.translator = None |
| self.output_formatter = None |
| |
| |
| self.speaker_verifier = None |
| self.noise_reducer = None |
| |
| |
| self.total_processing_time = 0 |
| self.component_times = {} |
| |
| |
| self.demo_mode = False |
| |
| logger.info(f"Initialized AudioIntelligencePipeline:") |
| logger.info(f" - Whisper model: {whisper_model_size}") |
| logger.info(f" - Target language: {target_language}") |
| logger.info(f" - Device: {device or 'auto'}") |
| logger.info(f" - Output directory: {self.output_dir}") |
| |
| def enable_demo_mode(self, enabled: bool = True): |
| """Enable demo mode with quality filtering.""" |
| self.demo_mode = enabled |
| logger.info(f"Demo mode: {'enabled' if enabled else 'disabled'}") |
| |
| def _initialize_components(self): |
| """Lazy initialization of pipeline components.""" |
| if self.audio_processor is None: |
| logger.info("Initializing AudioProcessor...") |
| self.audio_processor = AudioProcessor() |
| |
| if self.speaker_diarizer is None: |
| logger.info("Initializing SpeakerDiarizer...") |
| self.speaker_diarizer = SpeakerDiarizer( |
| hf_token=self.hf_token, |
| device=self.device |
| ) |
| |
| if self.speech_recognizer is None: |
| logger.info("Initializing SpeechRecognizer...") |
| self.speech_recognizer = SpeechRecognizer( |
| model_size=self.whisper_model_size, |
| device=self.device |
| ) |
| |
| if self.translator is None: |
| logger.info("Initializing Enhanced NeuralTranslator...") |
| self.translator = NeuralTranslator( |
| target_language=self.target_language, |
| device=self.device, |
| enable_google_api=True, |
| google_api_key=None |
| ) |
| |
| if self.output_formatter is None: |
| self.output_formatter = OutputFormatter() |
| |
| |
| if self.speaker_verifier is None: |
| logger.info("Initializing SpeakerVerifier...") |
| self.speaker_verifier = SpeakerVerifier( |
| device=self.device, |
| cache_dir=str(self.output_dir / "model_cache") |
| ) |
| |
| if self.noise_reducer is None: |
| logger.info("Initializing NoiseReducer...") |
| self.noise_reducer = NoiseReducer( |
| device=self.device, |
| cache_dir=str(self.output_dir / "model_cache") |
| ) |
| |
| def process_audio(self, |
| audio_file: Union[str, Path], |
| output_dir: Path = None, |
| save_outputs: bool = True, |
| output_formats: List[str] = None) -> Dict[str, Any]: |
| """ |
| Process audio file through complete pipeline. |
| |
| Args: |
| audio_file (Union[str, Path]): Path to input audio file |
| output_dir (Path, optional): Output directory for results |
| save_outputs (bool): Whether to save outputs to files |
| output_formats (List[str], optional): Formats to generate |
| |
| Returns: |
| Dict[str, Any]: Complete processing results and metadata |
| """ |
| if output_dir is None: |
| output_dir = self.output_dir |
| |
| start_time = time.time() |
| audio_path = Path(audio_file) |
| |
| if output_formats is None: |
| output_formats = ['json', 'srt', 'text', 'summary'] |
| |
| logger.info(f"Starting audio processing pipeline for: {audio_path.name}") |
| |
| |
| validation = validate_audio_file(audio_path) |
| if not validation['valid']: |
| raise ValueError(f"Invalid audio file: {validation['error']}") |
| |
| |
| self._initialize_components() |
| |
| try: |
| |
| progress = ProgressTracker(6, f"Processing {audio_path.name}") |
| |
| |
| progress.update() |
| logger.info("Step 1/6: Audio preprocessing and noise reduction...") |
| with performance_monitor("audio_preprocessing") as metrics: |
| |
| is_noisy = self.noise_reducer.is_noisy_audio(str(audio_path)) |
| if is_noisy: |
| logger.info("Detected noisy audio, applying enhancement...") |
| enhanced_path = self.noise_reducer.enhance_audio(str(audio_path)) |
| processed_audio, sample_rate = self.audio_processor.process_audio(enhanced_path) |
| else: |
| processed_audio, sample_rate = self.audio_processor.process_audio(str(audio_path)) |
| |
| audio_metadata = self.audio_processor.get_audio_info(str(audio_path)) |
| |
| self.component_times['audio_preprocessing'] = metrics.duration |
| logger.info(f"Audio preprocessed: {processed_audio.shape}, {sample_rate}Hz") |
| |
| |
| progress.update() |
| logger.info("Step 2/6: Speaker diarization...") |
| with performance_monitor("speaker_diarization") as metrics: |
| speaker_segments = self.speaker_diarizer.diarize(processed_audio, sample_rate) |
| |
| self.component_times['speaker_diarization'] = metrics.duration |
| logger.info(f"Identified {len(set(seg.speaker_id for seg in speaker_segments))} speakers " |
| f"in {len(speaker_segments)} segments") |
| |
| |
| progress.update() |
| logger.info("Step 3/6: Speech recognition...") |
| with performance_monitor("speech_recognition") as metrics: |
| |
| speaker_tuples = [(seg.start_time, seg.end_time, seg.speaker_id) |
| for seg in speaker_segments] |
| transcription_segments = self.speech_recognizer.transcribe_segments( |
| processed_audio, sample_rate, speaker_tuples, word_timestamps=True |
| ) |
| |
| self.component_times['speech_recognition'] = metrics.duration |
| languages_detected = set(seg.language for seg in transcription_segments) |
| logger.info(f"Transcribed {len(transcription_segments)} segments, " |
| f"languages: {', '.join(languages_detected)}") |
| |
| |
| progress.update() |
| logger.info("Step 4/6: Neural machine translation...") |
| with performance_monitor("translation") as metrics: |
| translation_results = [] |
| |
| |
| language_groups = {} |
| for seg in transcription_segments: |
| if seg.language not in language_groups: |
| language_groups[seg.language] = [] |
| language_groups[seg.language].append(seg) |
| |
| |
| for lang, segments in language_groups.items(): |
| if lang != self.target_language: |
| texts = [seg.text for seg in segments] |
| |
| for text in texts: |
| if hasattr(self.translator, 'translate_text_hybrid'): |
| |
| result = self.translator.translate_text_hybrid(text, lang, self.target_language) |
| else: |
| |
| result = self.translator.translate_text(text, lang, self.target_language) |
| translation_results.append(result) |
| else: |
| |
| for seg in segments: |
| translation_results.append(TranslationResult( |
| original_text=seg.text, |
| translated_text=seg.text, |
| source_language=lang, |
| target_language=self.target_language, |
| confidence=1.0, |
| model_used="identity" |
| )) |
| |
| self.component_times['translation'] = metrics.duration |
| logger.info(f"Translated {len(translation_results)} text segments") |
| |
| |
| progress.update() |
| logger.info("Step 5/6: Speaker verification...") |
| with performance_monitor("speaker_verification") as metrics: |
| |
| verification_results = {} |
| for speaker_id in set(seg.speaker_id for seg in speaker_segments): |
| |
| speaker_segment = next(seg for seg in speaker_segments if seg.speaker_id == speaker_id) |
| verification = self.speaker_verifier.identify_speaker( |
| str(audio_path), |
| speaker_segment.start_time, |
| speaker_segment.end_time |
| ) |
| verification_results[speaker_id] = verification |
| |
| self.component_times['speaker_verification'] = metrics.duration |
| logger.info(f"Speaker verification completed for {len(verification_results)} speakers") |
| |
| |
| progress.update() |
| logger.info("Step 6/6: Output formatting...") |
| with performance_monitor("output_formatting") as metrics: |
| |
| processed_segments = self._combine_results( |
| speaker_segments, transcription_segments, translation_results |
| ) |
| |
| |
| if hasattr(self, 'demo_mode') and self.demo_mode: |
| processed_segments = quality_controller.filter_results_for_demo(processed_segments) |
| logger.info("Applied demo quality filtering") |
| |
| |
| self.output_formatter = OutputFormatter(audio_path.name) |
| all_outputs = self.output_formatter.format_all_outputs( |
| processed_segments, |
| audio_metadata, |
| self.component_times |
| ) |
| |
| self.component_times['output_formatting'] = metrics.duration |
| progress.finish() |
| |
| |
| self.total_processing_time = time.time() - start_time |
| |
| |
| if save_outputs: |
| saved_files = self._save_outputs(all_outputs, audio_path, output_formats) |
| else: |
| saved_files = {} |
| |
| |
| results = { |
| 'success': True, |
| 'input_file': str(audio_path), |
| 'audio_metadata': audio_metadata, |
| 'processing_stats': { |
| 'total_time': self.total_processing_time, |
| 'component_times': self.component_times, |
| 'num_speakers': len(set(seg.speaker_id for seg in processed_segments)), |
| 'num_segments': len(processed_segments), |
| 'languages_detected': list(languages_detected), |
| 'total_speech_duration': sum(seg.duration for seg in processed_segments) |
| }, |
| 'ps6_features': { |
| 'speaker_verification': verification_results, |
| 'noise_reduction_applied': is_noisy, |
| 'snr_estimation': self.noise_reducer.estimate_snr(str(audio_path)) if hasattr(self, 'noise_reducer') else None |
| }, |
| 'outputs': all_outputs, |
| 'saved_files': saved_files, |
| 'processed_segments': processed_segments |
| } |
| |
| logger.info(f"Pipeline completed successfully in {format_duration(self.total_processing_time)}") |
| return results |
| |
| except Exception as e: |
| logger.error(f"Pipeline failed: {str(e)}") |
| raise |
| |
| def _combine_results(self, |
| speaker_segments: List[SpeakerSegment], |
| transcription_segments: List[TranscriptionSegment], |
| translation_results: List[TranslationResult]) -> List[ProcessedSegment]: |
| """Combine results from all pipeline stages into unified segments.""" |
| processed_segments = [] |
| |
| |
| for i, speaker_seg in enumerate(speaker_segments): |
| |
| transcription_seg = None |
| if i < len(transcription_segments): |
| transcription_seg = transcription_segments[i] |
| |
| |
| translation_result = None |
| if i < len(translation_results): |
| translation_result = translation_results[i] |
| |
| |
| processed_segment = ProcessedSegment( |
| start_time=speaker_seg.start_time, |
| end_time=speaker_seg.end_time, |
| speaker_id=speaker_seg.speaker_id, |
| original_text=transcription_seg.text if transcription_seg else "", |
| original_language=transcription_seg.language if transcription_seg else "unknown", |
| translated_text=translation_result.translated_text if translation_result else "", |
| confidence_diarization=speaker_seg.confidence, |
| confidence_transcription=transcription_seg.confidence if transcription_seg else 0.0, |
| confidence_translation=translation_result.confidence if translation_result else 0.0, |
| word_timestamps=transcription_seg.word_timestamps if transcription_seg else None, |
| model_info={ |
| 'diarization_model': 'pyannote/speaker-diarization-3.1', |
| 'transcription_model': f'faster-whisper-{self.whisper_model_size}', |
| 'translation_model': translation_result.model_used if translation_result else 'none' |
| } |
| ) |
| |
| processed_segments.append(processed_segment) |
| |
| return processed_segments |
| |
| def _save_outputs(self, |
| outputs: Dict[str, str], |
| audio_path: Path, |
| formats: List[str]) -> Dict[str, str]: |
| """Save output files to disk.""" |
| saved_files = {} |
| base_filename = safe_filename(audio_path.stem) |
| |
| format_extensions = { |
| 'json': 'json', |
| 'srt_original': 'srt', |
| 'srt_translated': 'en.srt', |
| 'text': 'txt', |
| 'csv': 'csv', |
| 'timeline': 'timeline.json', |
| 'summary': 'summary.txt' |
| } |
| |
| for format_name in formats: |
| if format_name in outputs: |
| extension = format_extensions.get(format_name, 'txt') |
| filename = f"{base_filename}.{extension}" |
| filepath = self.output_dir / filename |
| |
| try: |
| with open(filepath, 'w', encoding='utf-8') as f: |
| f.write(outputs[format_name]) |
| |
| saved_files[format_name] = str(filepath) |
| logger.info(f"Saved {format_name} output to: {filepath}") |
| |
| except Exception as e: |
| logger.error(f"Failed to save {format_name} output: {e}") |
| |
| return saved_files |
| |
| def benchmark_system(self, test_audio_path: str) -> Dict[str, Any]: |
| """Run system benchmark on test audio.""" |
| logger.info("Running system benchmark...") |
| |
| system_info = get_system_info() |
| |
| |
| iterations = 3 |
| benchmark_results = [] |
| |
| for i in range(iterations): |
| logger.info(f"Benchmark iteration {i+1}/{iterations}") |
| try: |
| result = self.process_audio(test_audio_path, save_outputs=False) |
| benchmark_results.append(result['processing_stats']) |
| except Exception as e: |
| logger.error(f"Benchmark iteration {i+1} failed: {e}") |
| continue |
| |
| if not benchmark_results: |
| return {'error': 'All benchmark iterations failed'} |
| |
| |
| avg_times = {} |
| for component in benchmark_results[0]['component_times']: |
| avg_times[component] = sum(r['component_times'][component] for r in benchmark_results) / len(benchmark_results) |
| |
| avg_total_time = sum(r['total_time'] for r in benchmark_results) / len(benchmark_results) |
| |
| return { |
| 'system_info': system_info, |
| 'test_file': test_audio_path, |
| 'iterations': len(benchmark_results), |
| 'average_times': avg_times, |
| 'average_total_time': avg_total_time, |
| 'all_iterations': benchmark_results |
| } |
|
|
|
|
| def main(): |
| """Command-line interface for the audio intelligence pipeline.""" |
| parser = argparse.ArgumentParser( |
| description="Multilingual Audio Intelligence System", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Examples: |
| python main.py audio.wav # Process with defaults |
| python main.py audio.mp3 --output-dir ./out # Custom output directory |
| python main.py audio.flac --translate-to es # Translate to Spanish |
| python main.py --benchmark test.wav # Run performance benchmark |
| python main.py audio.ogg --format json text # Generate specific formats |
| """ |
| ) |
| |
| |
| parser.add_argument("audio_file", nargs='?', help="Path to input audio file") |
| |
| |
| parser.add_argument("--whisper-model", choices=["tiny", "small", "medium", "large"], |
| default="small", help="Whisper model size (default: small)") |
| parser.add_argument("--translate-to", default="en", |
| help="Target language for translation (default: en)") |
| parser.add_argument("--device", choices=["cpu", "cuda", "auto"], default="auto", |
| help="Device to run on (default: auto)") |
| parser.add_argument("--hf-token", help="Hugging Face token for gated models") |
| |
| |
| parser.add_argument("--output-dir", "-o", default="./results", |
| help="Output directory (default: ./results)") |
| parser.add_argument("--format", nargs='+', |
| choices=["json", "srt", "text", "csv", "timeline", "summary", "all"], |
| default=["json", "srt", "text", "summary"], |
| help="Output formats to generate") |
| parser.add_argument("--no-save", action="store_true", |
| help="Don't save outputs to files") |
| |
| |
| parser.add_argument("--benchmark", action="store_true", |
| help="Run performance benchmark") |
| parser.add_argument("--system-info", action="store_true", |
| help="Show system information and exit") |
| parser.add_argument("--verbose", "-v", action="store_true", |
| help="Enable verbose logging") |
| parser.add_argument("--quiet", "-q", action="store_true", |
| help="Suppress non-error output") |
| |
| args = parser.parse_args() |
| |
| |
| if args.verbose: |
| logging.getLogger().setLevel(logging.DEBUG) |
| elif args.quiet: |
| logging.getLogger().setLevel(logging.ERROR) |
| |
| |
| if args.system_info: |
| system_info = get_system_info() |
| print("\n=== SYSTEM INFORMATION ===") |
| for key, value in system_info.items(): |
| print(f"{key}: {value}") |
| return |
| |
| |
| if not args.audio_file: |
| parser.error("Audio file is required (unless using --system-info)") |
| |
| audio_path = Path(args.audio_file) |
| if not audio_path.exists(): |
| parser.error(f"Audio file not found: {audio_path}") |
| |
| try: |
| |
| pipeline = AudioIntelligencePipeline( |
| whisper_model_size=args.whisper_model, |
| target_language=args.translate_to, |
| device=args.device, |
| hf_token=args.hf_token, |
| output_dir=args.output_dir |
| ) |
| |
| if args.benchmark: |
| |
| print(f"\n=== RUNNING BENCHMARK ON {audio_path.name} ===") |
| benchmark_results = pipeline.benchmark_system(str(audio_path)) |
| |
| if 'error' in benchmark_results: |
| print(f"Benchmark failed: {benchmark_results['error']}") |
| return 1 |
| |
| print(f"\nBenchmark Results ({benchmark_results['iterations']} iterations):") |
| print(f"Average total time: {format_duration(benchmark_results['average_total_time'])}") |
| print("\nComponent breakdown:") |
| for component, avg_time in benchmark_results['average_times'].items(): |
| print(f" {component}: {format_duration(avg_time)}") |
| |
| print(f"\nSystem: {benchmark_results['system_info']['platform']}") |
| print(f"GPU: {benchmark_results['system_info']['gpu_info']}") |
| |
| else: |
| |
| output_formats = args.format |
| if 'all' in output_formats: |
| output_formats = ['json', 'srt_original', 'srt_translated', 'text', 'csv', 'timeline', 'summary'] |
| |
| results = pipeline.process_audio( |
| str(audio_path), |
| save_outputs=not args.no_save, |
| output_formats=output_formats |
| ) |
| |
| |
| stats = results['processing_stats'] |
| print(f"\n=== PROCESSING COMPLETE ===") |
| print(f"File: {audio_path.name}") |
| print(f"Total time: {format_duration(stats['total_time'])}") |
| print(f"Speakers: {stats['num_speakers']}") |
| print(f"Segments: {stats['num_segments']}") |
| print(f"Languages: {', '.join(stats['languages_detected'])}") |
| print(f"Speech duration: {format_duration(stats['total_speech_duration'])}") |
| |
| if results['saved_files']: |
| print(f"\nOutput files saved to: {args.output_dir}") |
| for format_name, filepath in results['saved_files'].items(): |
| print(f" {format_name}: {Path(filepath).name}") |
| |
| if not args.quiet: |
| |
| segments = results['processed_segments'][:3] |
| print(f"\nSample output (first {len(segments)} segments):") |
| for i, seg in enumerate(segments, 1): |
| speaker = seg.speaker_id.replace("SPEAKER_", "Speaker ") |
| time_str = f"{seg.start_time:.1f}s-{seg.end_time:.1f}s" |
| print(f" #{i} [{time_str}] {speaker} ({seg.original_language}):") |
| print(f" Original: {seg.original_text}") |
| if seg.original_language != args.translate_to: |
| print(f" Translated: {seg.translated_text}") |
| |
| if len(results['processed_segments']) > 3: |
| print(f" ... and {len(results['processed_segments']) - 3} more segments") |
| |
| return 0 |
| |
| except KeyboardInterrupt: |
| print("\nProcessing interrupted by user") |
| return 1 |
| except Exception as e: |
| logger.error(f"Processing failed: {str(e)}") |
| if args.verbose: |
| import traceback |
| traceback.print_exc() |
| return 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |