| from whisperx.alignment import ( |
| DEFAULT_ALIGN_MODELS_TORCH as DAMT, |
| DEFAULT_ALIGN_MODELS_HF as DAMHF, |
| ) |
| from whisperx.utils import TO_LANGUAGE_CODE |
| import whisperx |
| import torch |
| import gc |
| import os |
| import soundfile as sf |
| from IPython.utils import capture |
| from .language_configuration import EXTRA_ALIGN, INVERTED_LANGUAGES |
| from .logging_setup import logger |
| from .postprocessor import sanitize_file_name |
| from .utils import remove_directory_contents, run_command |
|
|
| |
| import spaces |
| import copy |
| import random |
| import time |
|
|
| def random_sleep(): |
| if os.environ.get("ZERO_GPU") == "TRUE": |
| print("Random sleep") |
| sleep_time = round(random.uniform(7.2, 9.9), 1) |
| time.sleep(sleep_time) |
|
|
|
|
| @spaces.GPU(duration=120) |
| def load_and_transcribe_audio(asr_model, audio, compute_type, language, asr_options, batch_size, segment_duration_limit): |
| |
| model = whisperx.load_model( |
| asr_model, |
| os.environ.get("SONITR_DEVICE") if os.environ.get("ZERO_GPU") != "TRUE" else "cuda", |
| compute_type=compute_type, |
| language=language, |
| asr_options=asr_options, |
| ) |
|
|
| |
| result = model.transcribe( |
| audio, |
| batch_size=batch_size, |
| chunk_size=segment_duration_limit, |
| print_progress=True, |
| ) |
|
|
| del model |
| gc.collect() |
| torch.cuda.empty_cache() |
| |
| return result |
|
|
| def load_align_and_align_segments(result, audio, DAMHF): |
|
|
| |
| model_a, metadata = whisperx.load_align_model( |
| language_code=result["language"], |
| device=os.environ.get("SONITR_DEVICE") if os.environ.get("ZERO_GPU") != "TRUE" else "cuda", |
| model_name=None |
| if result["language"] in DAMHF.keys() |
| else EXTRA_ALIGN[result["language"]], |
| ) |
|
|
| |
| alignment_result = whisperx.align( |
| result["segments"], |
| model_a, |
| metadata, |
| audio, |
| os.environ.get("SONITR_DEVICE") if os.environ.get("ZERO_GPU") != "TRUE" else "cuda", |
| return_char_alignments=True, |
| print_progress=False, |
| ) |
|
|
| |
| del model_a |
| gc.collect() |
| torch.cuda.empty_cache() |
|
|
| return alignment_result |
|
|
| @spaces.GPU(duration=120) |
| def diarize_audio(diarize_model, audio_wav, min_speakers, max_speakers): |
|
|
| if os.environ.get("ZERO_GPU") == "TRUE": |
| diarize_model.model.to(torch.device("cuda")) |
| diarize_segments = diarize_model( |
| audio_wav, |
| min_speakers=min_speakers, |
| max_speakers=max_speakers |
| ) |
| return diarize_segments |
|
|
| |
|
|
| ASR_MODEL_OPTIONS = [ |
| "tiny", |
| "base", |
| "small", |
| "medium", |
| "large", |
| "large-v1", |
| "large-v2", |
| "large-v3", |
| "distil-large-v2", |
| "Systran/faster-distil-whisper-large-v3", |
| "tiny.en", |
| "base.en", |
| "small.en", |
| "medium.en", |
| "distil-small.en", |
| "distil-medium.en", |
| "OpenAI_API_Whisper", |
| ] |
|
|
| COMPUTE_TYPE_GPU = [ |
| "default", |
| "auto", |
| "int8", |
| "int8_float32", |
| "int8_float16", |
| "int8_bfloat16", |
| "float16", |
| "bfloat16", |
| "float32" |
| ] |
|
|
| COMPUTE_TYPE_CPU = [ |
| "default", |
| "auto", |
| "int8", |
| "int8_float32", |
| "int16", |
| "float32", |
| ] |
|
|
| WHISPER_MODELS_PATH = './WHISPER_MODELS' |
|
|
|
|
| def openai_api_whisper( |
| input_audio_file, |
| source_lang=None, |
| chunk_duration=1800 |
| ): |
|
|
| info = sf.info(input_audio_file) |
| duration = info.duration |
|
|
| output_directory = "./whisper_api_audio_parts" |
| os.makedirs(output_directory, exist_ok=True) |
| remove_directory_contents(output_directory) |
|
|
| if duration > chunk_duration: |
| |
| cm = f'ffmpeg -i "{input_audio_file}" -f segment -segment_time {chunk_duration} -c:a libvorbis "{output_directory}/output%03d.ogg"' |
| run_command(cm) |
| |
| chunk_files = sorted( |
| [f"{output_directory}/{f}" for f in os.listdir(output_directory) if f.endswith('.ogg')] |
| ) |
| else: |
| one_file = f"{output_directory}/output000.ogg" |
| cm = f'ffmpeg -i "{input_audio_file}" -c:a libvorbis {one_file}' |
| run_command(cm) |
| chunk_files = [one_file] |
|
|
| |
| segments = [] |
| language = source_lang if source_lang else None |
| for i, chunk in enumerate(chunk_files): |
| from openai import OpenAI |
| client = OpenAI() |
|
|
| audio_file = open(chunk, "rb") |
| transcription = client.audio.transcriptions.create( |
| model="whisper-1", |
| file=audio_file, |
| language=language, |
| response_format="verbose_json", |
| timestamp_granularities=["segment"], |
| ) |
|
|
| try: |
| transcript_dict = transcription.model_dump() |
| except: |
| transcript_dict = transcription.to_dict() |
|
|
| if language is None: |
| logger.info(f'Language detected: {transcript_dict["language"]}') |
| language = TO_LANGUAGE_CODE[transcript_dict["language"]] |
|
|
| chunk_time = chunk_duration * (i) |
|
|
| for seg in transcript_dict["segments"]: |
|
|
| if "start" in seg.keys(): |
| segments.append( |
| { |
| "text": seg["text"], |
| "start": seg["start"] + chunk_time, |
| "end": seg["end"] + chunk_time, |
| } |
| ) |
|
|
| audio = whisperx.load_audio(input_audio_file) |
| result = {"segments": segments, "language": language} |
|
|
| return audio, result |
|
|
|
|
| def find_whisper_models(): |
| path = WHISPER_MODELS_PATH |
| folders = [] |
|
|
| if os.path.exists(path): |
| for folder in os.listdir(path): |
| folder_path = os.path.join(path, folder) |
| if ( |
| os.path.isdir(folder_path) |
| and 'model.bin' in os.listdir(folder_path) |
| ): |
| folders.append(folder) |
| return folders |
|
|
| def transcribe_speech( |
| audio_wav, |
| asr_model, |
| compute_type, |
| batch_size, |
| SOURCE_LANGUAGE, |
| literalize_numbers=True, |
| segment_duration_limit=15, |
| ): |
| """ |
| Transcribe speech using a whisper model. |
| |
| Parameters: |
| - audio_wav (str): Path to the audio file in WAV format. |
| - asr_model (str): The whisper model to be loaded. |
| - compute_type (str): Type of compute to be used (e.g., 'int8', 'float16'). |
| - batch_size (int): Batch size for transcription. |
| - SOURCE_LANGUAGE (str): Source language for transcription. |
| |
| Returns: |
| - Tuple containing: |
| - audio: Loaded audio file. |
| - result: Transcription result as a dictionary. |
| """ |
|
|
| if asr_model == "OpenAI_API_Whisper": |
| if literalize_numbers: |
| logger.info( |
| "OpenAI's API Whisper does not support " |
| "the literalization of numbers." |
| ) |
| return openai_api_whisper(audio_wav, SOURCE_LANGUAGE) |
|
|
| |
| prompt = "ไปฅไธๆฏๆฎ้่ฏ็ๅฅๅญใ" if SOURCE_LANGUAGE == "zh" else None |
| SOURCE_LANGUAGE = ( |
| SOURCE_LANGUAGE if SOURCE_LANGUAGE != "zh-TW" else "zh" |
| ) |
| asr_options = { |
| "initial_prompt": prompt, |
| "suppress_numerals": literalize_numbers |
| } |
|
|
| if asr_model not in ASR_MODEL_OPTIONS: |
|
|
| base_dir = WHISPER_MODELS_PATH |
| if not os.path.exists(base_dir): |
| os.makedirs(base_dir) |
| model_dir = os.path.join(base_dir, sanitize_file_name(asr_model)) |
|
|
| if not os.path.exists(model_dir): |
| from ctranslate2.converters import TransformersConverter |
|
|
| quantization = "float32" |
| |
| try: |
| converter = TransformersConverter( |
| asr_model, |
| low_cpu_mem_usage=True, |
| copy_files=[ |
| "tokenizer_config.json", "preprocessor_config.json" |
| ] |
| ) |
| converter.convert( |
| model_dir, |
| quantization=quantization, |
| force=False |
| ) |
| except Exception as error: |
| if "File tokenizer_config.json does not exist" in str(error): |
| converter._copy_files = [ |
| "tokenizer.json", "preprocessor_config.json" |
| ] |
| converter.convert( |
| model_dir, |
| quantization=quantization, |
| force=True |
| ) |
| else: |
| raise error |
|
|
| asr_model = model_dir |
| logger.info(f"ASR Model: {str(model_dir)}") |
|
|
| audio = whisperx.load_audio(audio_wav) |
| |
| result = load_and_transcribe_audio( |
| asr_model, audio, compute_type, SOURCE_LANGUAGE, asr_options, batch_size, segment_duration_limit |
| ) |
|
|
| if result["language"] == "zh" and not prompt: |
| result["language"] = "zh-TW" |
| logger.info("Chinese - Traditional (zh-TW)") |
|
|
|
|
| return audio, result |
|
|
|
|
| def align_speech(audio, result): |
| """ |
| Aligns speech segments based on the provided audio and result metadata. |
| |
| Parameters: |
| - audio (array): The audio data in a suitable format for alignment. |
| - result (dict): Metadata containing information about the segments |
| and language. |
| |
| Returns: |
| - result (dict): Updated metadata after aligning the segments with |
| the audio. This includes character-level alignments if |
| 'return_char_alignments' is set to True. |
| |
| Notes: |
| - This function uses language-specific models to align speech segments. |
| - It performs language compatibility checks and selects the |
| appropriate alignment model. |
| - Cleans up memory by releasing resources after alignment. |
| """ |
| DAMHF.update(DAMT) |
| if ( |
| not result["language"] in DAMHF.keys() |
| and not result["language"] in EXTRA_ALIGN.keys() |
| ): |
| logger.warning( |
| "Automatic detection: Source language not compatible with align" |
| ) |
| raise ValueError( |
| f"Detected language {result['language']} incompatible, " |
| "you can select the source language to avoid this error." |
| ) |
| if ( |
| result["language"] in EXTRA_ALIGN.keys() |
| and EXTRA_ALIGN[result["language"]] == "" |
| ): |
| lang_name = ( |
| INVERTED_LANGUAGES[result["language"]] |
| if result["language"] in INVERTED_LANGUAGES.keys() |
| else result["language"] |
| ) |
| logger.warning( |
| "No compatible wav2vec2 model found " |
| f"for the language '{lang_name}', skipping alignment." |
| ) |
| return result |
|
|
| random_sleep() |
| result = load_align_and_align_segments(result, audio, DAMHF) |
|
|
| return result |
|
|
|
|
| diarization_models = { |
| "pyannote_3.1": "pyannote/speaker-diarization-3.1", |
| "pyannote_2.1": "pyannote/speaker-diarization@2.1", |
| "disable": "", |
| } |
|
|
|
|
| def reencode_speakers(result): |
|
|
| if result["segments"][0]["speaker"] == "SPEAKER_00": |
| return result |
|
|
| speaker_mapping = {} |
| counter = 0 |
|
|
| logger.debug("Reencode speakers") |
|
|
| for segment in result["segments"]: |
| old_speaker = segment["speaker"] |
| if old_speaker not in speaker_mapping: |
| speaker_mapping[old_speaker] = f"SPEAKER_{counter:02d}" |
| counter += 1 |
| segment["speaker"] = speaker_mapping[old_speaker] |
|
|
| return result |
|
|
|
|
| def diarize_speech( |
| audio_wav, |
| result, |
| min_speakers, |
| max_speakers, |
| YOUR_HF_TOKEN, |
| model_name="pyannote/speaker-diarization@2.1", |
| ): |
| """ |
| Performs speaker diarization on speech segments. |
| |
| Parameters: |
| - audio_wav (array): Audio data in WAV format to perform speaker |
| diarization. |
| - result (dict): Metadata containing information about speech segments |
| and alignments. |
| - min_speakers (int): Minimum number of speakers expected in the audio. |
| - max_speakers (int): Maximum number of speakers expected in the audio. |
| - YOUR_HF_TOKEN (str): Your Hugging Face API token for model |
| authentication. |
| - model_name (str): Name of the speaker diarization model to be used |
| (default: "pyannote/speaker-diarization@2.1"). |
| |
| Returns: |
| - result_diarize (dict): Updated metadata after assigning speaker |
| labels to segments. |
| |
| Notes: |
| - This function utilizes a speaker diarization model to label speaker |
| segments in the audio. |
| - It assigns speakers to word-level segments based on diarization results. |
| - Cleans up memory by releasing resources after diarization. |
| - If only one speaker is specified, each segment is automatically assigned |
| as the first speaker, eliminating the need for diarization inference. |
| """ |
|
|
| if max(min_speakers, max_speakers) > 1 and model_name: |
| try: |
|
|
| diarize_model = whisperx.DiarizationPipeline( |
| model_name=model_name, |
| use_auth_token=YOUR_HF_TOKEN, |
| device=os.environ.get("SONITR_DEVICE"), |
| ) |
|
|
| except Exception as error: |
| error_str = str(error) |
| gc.collect() |
| torch.cuda.empty_cache() |
| if "'NoneType' object has no attribute 'to'" in error_str: |
| if model_name == diarization_models["pyannote_2.1"]: |
| raise ValueError( |
| "Accept the license agreement for using Pyannote 2.1." |
| " You need to have an account on Hugging Face and " |
| "accept the license to use the models: " |
| "https://huggingface.co/pyannote/speaker-diarization " |
| "and https://huggingface.co/pyannote/segmentation " |
| "Get your KEY TOKEN here: " |
| "https://hf.co/settings/tokens " |
| ) |
| elif model_name == diarization_models["pyannote_3.1"]: |
| raise ValueError( |
| "New Licence Pyannote 3.1: You need to have an account" |
| " on Hugging Face and accept the license to use the " |
| "models: https://huggingface.co/pyannote/speaker-diarization-3.1 " |
| "and https://huggingface.co/pyannote/segmentation-3.0 " |
| ) |
| else: |
| raise error |
|
|
| random_sleep() |
| diarize_segments = diarize_audio(diarize_model, audio_wav, min_speakers, max_speakers) |
|
|
| result_diarize = whisperx.assign_word_speakers( |
| diarize_segments, result |
| ) |
|
|
| for segment in result_diarize["segments"]: |
| if "speaker" not in segment: |
| segment["speaker"] = "SPEAKER_00" |
| logger.warning( |
| f"No speaker detected in {segment['start']}. First TTS " |
| f"will be used for the segment text: {segment['text']} " |
| ) |
|
|
| del diarize_model |
| gc.collect() |
| torch.cuda.empty_cache() |
| else: |
| result_diarize = result |
| result_diarize["segments"] = [ |
| {**item, "speaker": "SPEAKER_00"} |
| for item in result_diarize["segments"] |
| ] |
| return reencode_speakers(result_diarize) |
|
|