| | from soni_translate.logging_setup import logger |
| | import torch |
| | import gc |
| | import numpy as np |
| | import os |
| | import shutil |
| | import warnings |
| | import threading |
| | from tqdm import tqdm |
| | from lib.infer_pack.models import ( |
| | SynthesizerTrnMs256NSFsid, |
| | SynthesizerTrnMs256NSFsid_nono, |
| | SynthesizerTrnMs768NSFsid, |
| | SynthesizerTrnMs768NSFsid_nono, |
| | ) |
| | from lib.audio import load_audio |
| | import soundfile as sf |
| | import edge_tts |
| | import asyncio |
| | from soni_translate.utils import remove_directory_contents, create_directories |
| | from scipy import signal |
| | from time import time as ttime |
| | import faiss |
| | from vci_pipeline import VC, change_rms, bh, ah |
| | import librosa |
| |
|
| | warnings.filterwarnings("ignore") |
| |
|
| |
|
| | class Config: |
| | def __init__(self, only_cpu=False): |
| | self.device = "cuda:0" |
| | self.is_half = True |
| | self.n_cpu = 0 |
| | self.gpu_name = None |
| | self.gpu_mem = None |
| | ( |
| | self.x_pad, |
| | self.x_query, |
| | self.x_center, |
| | self.x_max |
| | ) = self.device_config(only_cpu) |
| |
|
| | def device_config(self, only_cpu) -> tuple: |
| | if torch.cuda.is_available() and not only_cpu: |
| | i_device = int(self.device.split(":")[-1]) |
| | self.gpu_name = torch.cuda.get_device_name(i_device) |
| | if ( |
| | ("16" in self.gpu_name and "V100" not in self.gpu_name.upper()) |
| | or "P40" in self.gpu_name.upper() |
| | or "1060" in self.gpu_name |
| | or "1070" in self.gpu_name |
| | or "1080" in self.gpu_name |
| | ): |
| | logger.info( |
| | "16/10 Series GPUs and P40 excel " |
| | "in single-precision tasks." |
| | ) |
| | self.is_half = False |
| | else: |
| | self.gpu_name = None |
| | self.gpu_mem = int( |
| | torch.cuda.get_device_properties(i_device).total_memory |
| | / 1024 |
| | / 1024 |
| | / 1024 |
| | + 0.4 |
| | ) |
| | elif torch.backends.mps.is_available() and not only_cpu: |
| | logger.info("Supported N-card not found, using MPS for inference") |
| | self.device = "mps" |
| | else: |
| | logger.info("No supported N-card found, using CPU for inference") |
| | self.device = "cpu" |
| | self.is_half = False |
| |
|
| | if self.n_cpu == 0: |
| | self.n_cpu = os.cpu_count() |
| |
|
| | if self.is_half: |
| | |
| | x_pad = 3 |
| | x_query = 10 |
| | x_center = 60 |
| | x_max = 65 |
| | else: |
| | |
| | x_pad = 1 |
| | x_query = 6 |
| | x_center = 38 |
| | x_max = 41 |
| |
|
| | if self.gpu_mem is not None and self.gpu_mem <= 4: |
| | x_pad = 1 |
| | x_query = 5 |
| | x_center = 30 |
| | x_max = 32 |
| |
|
| | logger.info( |
| | f"Config: Device is {self.device}, " |
| | f"half precision is {self.is_half}" |
| | ) |
| |
|
| | return x_pad, x_query, x_center, x_max |
| |
|
| |
|
| | BASE_DOWNLOAD_LINK = "https://huggingface.co/r3gm/sonitranslate_voice_models/resolve/main/" |
| | BASE_MODELS = [ |
| | "hubert_base.pt", |
| | "rmvpe.pt" |
| | ] |
| | BASE_DIR = "." |
| |
|
| |
|
| | def load_hu_bert(config): |
| | from fairseq import checkpoint_utils |
| | from soni_translate.utils import download_manager |
| |
|
| | for id_model in BASE_MODELS: |
| | download_manager( |
| | os.path.join(BASE_DOWNLOAD_LINK, id_model), BASE_DIR |
| | ) |
| |
|
| | models, _, _ = checkpoint_utils.load_model_ensemble_and_task( |
| | ["hubert_base.pt"], |
| | suffix="", |
| | ) |
| | hubert_model = models[0] |
| | hubert_model = hubert_model.to(config.device) |
| | if config.is_half: |
| | hubert_model = hubert_model.half() |
| | else: |
| | hubert_model = hubert_model.float() |
| | hubert_model.eval() |
| |
|
| | return hubert_model |
| |
|
| |
|
| | def load_trained_model(model_path, config): |
| |
|
| | if not model_path: |
| | raise ValueError("No model found") |
| |
|
| | logger.info("Loading %s" % model_path) |
| | cpt = torch.load(model_path, map_location="cpu") |
| | tgt_sr = cpt["config"][-1] |
| | cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] |
| | if_f0 = cpt.get("f0", 1) |
| | if if_f0 == 0: |
| | |
| | pass |
| |
|
| | version = cpt.get("version", "v1") |
| | if version == "v1": |
| | if if_f0 == 1: |
| | net_g = SynthesizerTrnMs256NSFsid( |
| | *cpt["config"], is_half=config.is_half |
| | ) |
| | else: |
| | net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) |
| | elif version == "v2": |
| | if if_f0 == 1: |
| | net_g = SynthesizerTrnMs768NSFsid( |
| | *cpt["config"], is_half=config.is_half |
| | ) |
| | else: |
| | net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) |
| | del net_g.enc_q |
| |
|
| | net_g.load_state_dict(cpt["weight"], strict=False) |
| | net_g.eval().to(config.device) |
| |
|
| | if config.is_half: |
| | net_g = net_g.half() |
| | else: |
| | net_g = net_g.float() |
| |
|
| | vc = VC(tgt_sr, config) |
| | n_spk = cpt["config"][-3] |
| |
|
| | return n_spk, tgt_sr, net_g, vc, cpt, version |
| |
|
| |
|
| | class ClassVoices: |
| | def __init__(self, only_cpu=False): |
| | self.model_config = {} |
| | self.config = None |
| | self.only_cpu = only_cpu |
| |
|
| | def apply_conf( |
| | self, |
| | tag="base_model", |
| | file_model="", |
| | pitch_algo="pm", |
| | pitch_lvl=0, |
| | file_index="", |
| | index_influence=0.66, |
| | respiration_median_filtering=3, |
| | envelope_ratio=0.25, |
| | consonant_breath_protection=0.33, |
| | resample_sr=0, |
| | file_pitch_algo="", |
| | ): |
| |
|
| | if not file_model: |
| | raise ValueError("Model not found") |
| |
|
| | if file_index is None: |
| | file_index = "" |
| |
|
| | if file_pitch_algo is None: |
| | file_pitch_algo = "" |
| |
|
| | if not self.config: |
| | self.config = Config(self.only_cpu) |
| | self.hu_bert_model = None |
| | self.model_pitch_estimator = None |
| |
|
| | self.model_config[tag] = { |
| | "file_model": file_model, |
| | "pitch_algo": pitch_algo, |
| | "pitch_lvl": pitch_lvl, |
| | "file_index": file_index, |
| | "index_influence": index_influence, |
| | "respiration_median_filtering": respiration_median_filtering, |
| | "envelope_ratio": envelope_ratio, |
| | "consonant_breath_protection": consonant_breath_protection, |
| | "resample_sr": resample_sr, |
| | "file_pitch_algo": file_pitch_algo, |
| | } |
| | return f"CONFIGURATION APPLIED FOR {tag}: {file_model}" |
| |
|
| | def infer( |
| | self, |
| | task_id, |
| | params, |
| | |
| | n_spk, |
| | tgt_sr, |
| | net_g, |
| | pipe, |
| | cpt, |
| | version, |
| | if_f0, |
| | |
| | index_rate, |
| | index, |
| | big_npy, |
| | |
| | inp_f0, |
| | |
| | input_audio_path, |
| | overwrite, |
| | ): |
| |
|
| | f0_method = params["pitch_algo"] |
| | f0_up_key = params["pitch_lvl"] |
| | filter_radius = params["respiration_median_filtering"] |
| | resample_sr = params["resample_sr"] |
| | rms_mix_rate = params["envelope_ratio"] |
| | protect = params["consonant_breath_protection"] |
| |
|
| | if not os.path.exists(input_audio_path): |
| | raise ValueError( |
| | "The audio file was not found or is not " |
| | f"a valid file: {input_audio_path}" |
| | ) |
| |
|
| | f0_up_key = int(f0_up_key) |
| |
|
| | audio = load_audio(input_audio_path, 16000) |
| |
|
| | |
| | audio_max = np.abs(audio).max() / 0.95 |
| | if audio_max > 1: |
| | audio /= audio_max |
| |
|
| | times = [0, 0, 0] |
| |
|
| | |
| | |
| | audio = signal.filtfilt(bh, ah, audio) |
| | audio_pad = np.pad( |
| | audio, (pipe.window // 2, pipe.window // 2), mode="reflect" |
| | ) |
| | opt_ts = [] |
| | if audio_pad.shape[0] > pipe.t_max: |
| | audio_sum = np.zeros_like(audio) |
| | for i in range(pipe.window): |
| | audio_sum += audio_pad[i:i - pipe.window] |
| | for t in range(pipe.t_center, audio.shape[0], pipe.t_center): |
| | opt_ts.append( |
| | t |
| | - pipe.t_query |
| | + np.where( |
| | np.abs(audio_sum[t - pipe.t_query: t + pipe.t_query]) |
| | == np.abs(audio_sum[t - pipe.t_query: t + pipe.t_query]).min() |
| | )[0][0] |
| | ) |
| |
|
| | s = 0 |
| | audio_opt = [] |
| | t = None |
| | t1 = ttime() |
| |
|
| | sid_value = 0 |
| | sid = torch.tensor(sid_value, device=pipe.device).unsqueeze(0).long() |
| |
|
| | |
| | audio_pad = np.pad(audio, (pipe.t_pad, pipe.t_pad), mode="reflect") |
| | p_len = audio_pad.shape[0] // pipe.window |
| |
|
| | |
| | pitch, pitchf = None, None |
| | if if_f0 == 1: |
| | pitch, pitchf = pipe.get_f0( |
| | input_audio_path, |
| | audio_pad, |
| | p_len, |
| | f0_up_key, |
| | f0_method, |
| | filter_radius, |
| | inp_f0, |
| | ) |
| | pitch = pitch[:p_len] |
| | pitchf = pitchf[:p_len] |
| | if pipe.device == "mps": |
| | pitchf = pitchf.astype(np.float32) |
| | pitch = torch.tensor( |
| | pitch, device=pipe.device |
| | ).unsqueeze(0).long() |
| | pitchf = torch.tensor( |
| | pitchf, device=pipe.device |
| | ).unsqueeze(0).float() |
| |
|
| | t2 = ttime() |
| | times[1] += t2 - t1 |
| | for t in opt_ts: |
| | t = t // pipe.window * pipe.window |
| | if if_f0 == 1: |
| | pitch_slice = pitch[ |
| | :, s // pipe.window: (t + pipe.t_pad2) // pipe.window |
| | ] |
| | pitchf_slice = pitchf[ |
| | :, s // pipe.window: (t + pipe.t_pad2) // pipe.window |
| | ] |
| | else: |
| | pitch_slice = None |
| | pitchf_slice = None |
| |
|
| | audio_slice = audio_pad[s:t + pipe.t_pad2 + pipe.window] |
| | audio_opt.append( |
| | pipe.vc( |
| | self.hu_bert_model, |
| | net_g, |
| | sid, |
| | audio_slice, |
| | pitch_slice, |
| | pitchf_slice, |
| | times, |
| | index, |
| | big_npy, |
| | index_rate, |
| | version, |
| | protect, |
| | )[pipe.t_pad_tgt:-pipe.t_pad_tgt] |
| | ) |
| | s = t |
| |
|
| | pitch_end_slice = pitch[ |
| | :, t // pipe.window: |
| | ] if t is not None else pitch |
| | pitchf_end_slice = pitchf[ |
| | :, t // pipe.window: |
| | ] if t is not None else pitchf |
| |
|
| | audio_opt.append( |
| | pipe.vc( |
| | self.hu_bert_model, |
| | net_g, |
| | sid, |
| | audio_pad[t:], |
| | pitch_end_slice, |
| | pitchf_end_slice, |
| | times, |
| | index, |
| | big_npy, |
| | index_rate, |
| | version, |
| | protect, |
| | )[pipe.t_pad_tgt:-pipe.t_pad_tgt] |
| | ) |
| |
|
| | audio_opt = np.concatenate(audio_opt) |
| | if rms_mix_rate != 1: |
| | audio_opt = change_rms( |
| | audio, 16000, audio_opt, tgt_sr, rms_mix_rate |
| | ) |
| | if resample_sr >= 16000 and tgt_sr != resample_sr: |
| | audio_opt = librosa.resample( |
| | audio_opt, orig_sr=tgt_sr, target_sr=resample_sr |
| | ) |
| | audio_max = np.abs(audio_opt).max() / 0.99 |
| | max_int16 = 32768 |
| | if audio_max > 1: |
| | max_int16 /= audio_max |
| | audio_opt = (audio_opt * max_int16).astype(np.int16) |
| | del pitch, pitchf, sid |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| |
|
| | if tgt_sr != resample_sr >= 16000: |
| | final_sr = resample_sr |
| | else: |
| | final_sr = tgt_sr |
| |
|
| | """ |
| | "Success.\n %s\nTime:\n npy:%ss, f0:%ss, infer:%ss" % ( |
| | times[0], |
| | times[1], |
| | times[2], |
| | ), (final_sr, audio_opt) |
| | |
| | """ |
| |
|
| | if overwrite: |
| | output_audio_path = input_audio_path |
| | else: |
| | basename = os.path.basename(input_audio_path) |
| | dirname = os.path.dirname(input_audio_path) |
| |
|
| | new_basename = basename.split( |
| | '.')[0] + "_edited." + basename.split('.')[-1] |
| | new_path = os.path.join(dirname, new_basename) |
| | logger.info(str(new_path)) |
| |
|
| | output_audio_path = new_path |
| |
|
| | |
| | sf.write( |
| | file=output_audio_path, |
| | samplerate=final_sr, |
| | data=audio_opt |
| | ) |
| |
|
| | self.model_config[task_id]["result"].append(output_audio_path) |
| | self.output_list.append(output_audio_path) |
| |
|
| | def make_test( |
| | self, |
| | tts_text, |
| | tts_voice, |
| | model_path, |
| | index_path, |
| | transpose, |
| | f0_method, |
| | ): |
| |
|
| | folder_test = "test" |
| | tag = "test_edge" |
| | tts_file = "test/test.wav" |
| | tts_edited = "test/test_edited.wav" |
| |
|
| | create_directories(folder_test) |
| | remove_directory_contents(folder_test) |
| |
|
| | if "SET_LIMIT" == os.getenv("DEMO"): |
| | if len(tts_text) > 60: |
| | tts_text = tts_text[:60] |
| | logger.warning("DEMO; limit to 60 characters") |
| |
|
| | try: |
| | asyncio.run(edge_tts.Communicate( |
| | tts_text, "-".join(tts_voice.split('-')[:-1]) |
| | ).save(tts_file)) |
| | except Exception as e: |
| | raise ValueError( |
| | "No audio was received. Please change the " |
| | f"tts voice for {tts_voice}. Error: {str(e)}" |
| | ) |
| |
|
| | shutil.copy(tts_file, tts_edited) |
| |
|
| | self.apply_conf( |
| | tag=tag, |
| | file_model=model_path, |
| | pitch_algo=f0_method, |
| | pitch_lvl=transpose, |
| | file_index=index_path, |
| | index_influence=0.66, |
| | respiration_median_filtering=3, |
| | envelope_ratio=0.25, |
| | consonant_breath_protection=0.33, |
| | ) |
| |
|
| | self( |
| | audio_files=tts_edited, |
| | tag_list=tag, |
| | overwrite=True |
| | ) |
| |
|
| | return tts_edited, tts_file |
| |
|
| | def run_threads(self, threads): |
| | |
| | for thread in threads: |
| | thread.start() |
| |
|
| | |
| | for thread in threads: |
| | thread.join() |
| |
|
| | gc.collect() |
| | torch.cuda.empty_cache() |
| |
|
| | def unload_models(self): |
| | self.hu_bert_model = None |
| | self.model_pitch_estimator = None |
| | gc.collect() |
| | torch.cuda.empty_cache() |
| |
|
| | def __call__( |
| | self, |
| | audio_files=[], |
| | tag_list=[], |
| | overwrite=False, |
| | parallel_workers=1, |
| | ): |
| | logger.info(f"Parallel workers: {str(parallel_workers)}") |
| |
|
| | self.output_list = [] |
| |
|
| | if not self.model_config: |
| | raise ValueError("No model has been configured for inference") |
| |
|
| | if isinstance(audio_files, str): |
| | audio_files = [audio_files] |
| | if isinstance(tag_list, str): |
| | tag_list = [tag_list] |
| |
|
| | if not audio_files: |
| | raise ValueError("No audio found to convert") |
| | if not tag_list: |
| | tag_list = [list(self.model_config.keys())[-1]] * len(audio_files) |
| |
|
| | if len(audio_files) > len(tag_list): |
| | logger.info("Extend tag list to match audio files") |
| | extend_number = len(audio_files) - len(tag_list) |
| | tag_list.extend([tag_list[0]] * extend_number) |
| |
|
| | if len(audio_files) < len(tag_list): |
| | logger.info("Cut list tags") |
| | tag_list = tag_list[:len(audio_files)] |
| |
|
| | tag_file_pairs = list(zip(tag_list, audio_files)) |
| | sorted_tag_file = sorted(tag_file_pairs, key=lambda x: x[0]) |
| |
|
| | |
| | if not self.hu_bert_model: |
| | self.hu_bert_model = load_hu_bert(self.config) |
| |
|
| | cache_params = None |
| | threads = [] |
| | progress_bar = tqdm(total=len(tag_list), desc="Progress") |
| | for i, (id_tag, input_audio_path) in enumerate(sorted_tag_file): |
| |
|
| | if id_tag not in self.model_config.keys(): |
| | logger.info( |
| | f"No configured model for {id_tag} with {input_audio_path}" |
| | ) |
| | continue |
| |
|
| | if ( |
| | len(threads) >= parallel_workers |
| | or cache_params != id_tag |
| | and cache_params is not None |
| | ): |
| |
|
| | self.run_threads(threads) |
| | progress_bar.update(len(threads)) |
| |
|
| | threads = [] |
| |
|
| | if cache_params != id_tag: |
| |
|
| | self.model_config[id_tag]["result"] = [] |
| |
|
| | |
| | ( |
| | n_spk, |
| | tgt_sr, |
| | net_g, |
| | pipe, |
| | cpt, |
| | version, |
| | if_f0, |
| | index_rate, |
| | index, |
| | big_npy, |
| | inp_f0, |
| | ) = [None] * 11 |
| | gc.collect() |
| | torch.cuda.empty_cache() |
| |
|
| | |
| | params = self.model_config[id_tag] |
| |
|
| | model_path = params["file_model"] |
| | f0_method = params["pitch_algo"] |
| | file_index = params["file_index"] |
| | index_rate = params["index_influence"] |
| | f0_file = params["file_pitch_algo"] |
| |
|
| | |
| | ( |
| | n_spk, |
| | tgt_sr, |
| | net_g, |
| | pipe, |
| | cpt, |
| | version |
| | ) = load_trained_model(model_path, self.config) |
| | if_f0 = cpt.get("f0", 1) |
| |
|
| | |
| | if os.path.exists(file_index) and index_rate != 0: |
| | try: |
| | index = faiss.read_index(file_index) |
| | big_npy = index.reconstruct_n(0, index.ntotal) |
| | except Exception as error: |
| | logger.error(f"Index: {str(error)}") |
| | index_rate = 0 |
| | index = big_npy = None |
| | else: |
| | logger.warning("File index not found") |
| | index_rate = 0 |
| | index = big_npy = None |
| |
|
| | |
| | inp_f0 = None |
| | if os.path.exists(f0_file): |
| | try: |
| | with open(f0_file, "r") as f: |
| | lines = f.read().strip("\n").split("\n") |
| | inp_f0 = [] |
| | for line in lines: |
| | inp_f0.append([float(i) for i in line.split(",")]) |
| | inp_f0 = np.array(inp_f0, dtype="float32") |
| | except Exception as error: |
| | logger.error(f"f0 file: {str(error)}") |
| |
|
| | if "rmvpe" in f0_method: |
| | if not self.model_pitch_estimator: |
| | from lib.rmvpe import RMVPE |
| |
|
| | logger.info("Loading vocal pitch estimator model") |
| | self.model_pitch_estimator = RMVPE( |
| | "rmvpe.pt", |
| | is_half=self.config.is_half, |
| | device=self.config.device |
| | ) |
| |
|
| | pipe.model_rmvpe = self.model_pitch_estimator |
| |
|
| | cache_params = id_tag |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | thread = threading.Thread( |
| | target=self.infer, |
| | args=( |
| | id_tag, |
| | params, |
| | |
| | n_spk, |
| | tgt_sr, |
| | net_g, |
| | pipe, |
| | cpt, |
| | version, |
| | if_f0, |
| | |
| | index_rate, |
| | index, |
| | big_npy, |
| | |
| | inp_f0, |
| | |
| | input_audio_path, |
| | overwrite, |
| | ) |
| | ) |
| |
|
| | threads.append(thread) |
| |
|
| | |
| | if threads: |
| | self.run_threads(threads) |
| |
|
| | progress_bar.update(len(threads)) |
| | progress_bar.close() |
| |
|
| | final_result = [] |
| | valid_tags = set(tag_list) |
| | for tag in valid_tags: |
| | if ( |
| | tag in self.model_config.keys() |
| | and "result" in self.model_config[tag].keys() |
| | ): |
| | final_result.extend(self.model_config[tag]["result"]) |
| |
|
| | return final_result |
| |
|