| import argparse |
| import gc |
| import hashlib |
| import json |
| import os |
| import shlex |
| import subprocess |
| from contextlib import suppress |
| from urllib.parse import urlparse, parse_qs |
|
|
| import gradio as gr |
| import librosa |
| import numpy as np |
| import soundfile as sf |
| import sox |
| import yt_dlp |
| from pedalboard import Pedalboard, Reverb, Compressor, HighpassFilter |
| from pedalboard.io import AudioFile |
| from pydub import AudioSegment |
| from audio_separator.separator import Separator |
| from rvc import Config, load_hubert, get_vc, rvc_infer |
|
|
| |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| mdxnet_models_dir = os.path.join(BASE_DIR, 'mdxnet_models') |
| rvc_models_dir = os.path.join(BASE_DIR, 'rvc_models') |
| output_dir = os.path.join(BASE_DIR, 'song_output') |
|
|
|
|
| def get_youtube_video_id(url, ignore_playlist=True): |
| """ |
| Extract the YouTube video ID from various URL formats. |
| |
| Examples: |
| http://youtu.be/SA2iWivDJiE |
| http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu |
| http://www.youtube.com/embed/SA2iWivDJiE |
| http://www.youtube.com/v/SA2iWivDJiE?version=3&hl=en_US |
| """ |
| parsed_url = urlparse(url) |
| hostname = parsed_url.hostname or '' |
| path = parsed_url.path |
|
|
| if hostname.lower() == 'youtu.be': |
| return path.lstrip('/') |
|
|
| if hostname.lower() in {'www.youtube.com', 'youtube.com', 'music.youtube.com'}: |
| if not ignore_playlist: |
| with suppress(KeyError): |
| return parse_qs(parsed_url.query)['list'][0] |
| if parsed_url.path == '/watch': |
| return parse_qs(parsed_url.query).get('v', [None])[0] |
| if parsed_url.path.startswith('/watch/'): |
| return parsed_url.path.split('/')[1] |
| if parsed_url.path.startswith('/embed/'): |
| return parsed_url.path.split('/')[2] |
| if parsed_url.path.startswith('/v/'): |
| return parsed_url.path.split('/')[2] |
|
|
| return None |
|
|
|
|
| def yt_download(link): |
| """ |
| Download the audio from a YouTube link as an mp3 file. |
| """ |
| ydl_opts = { |
| 'format': 'bestaudio', |
| 'outtmpl': '%(title)s', |
| 'nocheckcertificate': True, |
| 'ignoreerrors': True, |
| 'no_warnings': True, |
| 'quiet': True, |
| 'extractaudio': True, |
| 'postprocessors': [{ |
| 'key': 'FFmpegExtractAudio', |
| 'preferredcodec': 'mp3' |
| }], |
| } |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| result = ydl.extract_info(link, download=True) |
| download_path = ydl.prepare_filename(result, outtmpl='%(title)s.mp3') |
| return download_path |
|
|
|
|
| def display_progress(message, percent, is_webui, progress=None): |
| """ |
| Display progress either via the provided progress callback or by printing. |
| """ |
| if is_webui and progress is not None: |
| progress(percent, desc=message) |
| else: |
| print(message) |
|
|
|
|
| def raise_exception(error_msg, is_webui): |
| """ |
| Raise an exception. If running in a web UI, use gr.Error. |
| """ |
| if is_webui: |
| raise gr.Error(error_msg) |
| else: |
| raise Exception(error_msg) |
|
|
|
|
| def get_rvc_model(voice_model, is_webui): |
| """ |
| Search the specified RVC model directory for the model (.pth) and index (.index) files. |
| """ |
| rvc_model_filename, rvc_index_filename = None, None |
| model_dir = os.path.join(rvc_models_dir, voice_model) |
| if not os.path.exists(model_dir): |
| raise_exception(f'Model directory {model_dir} does not exist.', is_webui) |
| for file in os.listdir(model_dir): |
| ext = os.path.splitext(file)[1] |
| if ext == '.pth': |
| rvc_model_filename = file |
| if ext == '.index': |
| rvc_index_filename = file |
|
|
| if rvc_model_filename is None: |
| error_msg = f'No model file exists in {model_dir}.' |
| raise_exception(error_msg, is_webui) |
|
|
| model_path = os.path.join(model_dir, rvc_model_filename) |
| index_path = os.path.join(model_dir, rvc_index_filename) if rvc_index_filename else '' |
| return model_path, index_path |
|
|
|
|
| def separation_uvr(filename, output): |
| """ |
| Run the separation steps using different pre-trained models. |
| Returns a tuple of four file paths: |
| - vocals_no_reverb: The vocals after initial de-echo/de-reverb (used as intermediate vocals) |
| - instrumental_path: The separated instrumental audio |
| - main_vocals_dereverb: The lead vocals after final de-reverb processing |
| - backup_vocals: The backup vocals extracted in the final stage |
| """ |
| separator = Separator(output_dir=output) |
| base_name = os.path.splitext(os.path.basename(filename))[0] |
|
|
| instrumental_path = os.path.join(output, f'{base_name}_Instrumental.wav') |
| initial_vocals = os.path.join(output, f'{base_name}_Vocals.wav') |
| vocals_no_reverb = os.path.join(output, f'{base_name}_Vocals (No Reverb).wav') |
| vocals_reverb = os.path.join(output, f'{base_name}_Vocals (Reverb).wav') |
| main_vocals_dereverb = os.path.join(output, f'{base_name}_Vocals_Main_DeReverb.wav') |
| backup_vocals = os.path.join(output, f'{base_name}_Vocals_Backup.wav') |
|
|
| separator.load_model(model_filename='model_bs_roformer_ep_317_sdr_12.9755.ckpt') |
| voc_inst = separator.separate(filename) |
| os.rename(os.path.join(output, voc_inst[0]), instrumental_path) |
| os.rename(os.path.join(output, voc_inst[1]), initial_vocals) |
|
|
| separator.load_model(model_filename='UVR-DeEcho-DeReverb.pth') |
| voc_no_reverb = separator.separate(initial_vocals) |
| os.rename(os.path.join(output, voc_no_reverb[0]), vocals_no_reverb) |
| os.rename(os.path.join(output, voc_no_reverb[1]), vocals_reverb) |
|
|
| separator.load_model(model_filename='mel_band_roformer_karaoke_aufr33_viperx_sdr_10.1956.ckpt') |
| voc_split = separator.separate(vocals_no_reverb) |
| os.rename(os.path.join(output, voc_split[0]), backup_vocals) |
| os.rename(os.path.join(output, voc_split[1]), main_vocals_dereverb) |
|
|
| if os.path.exists(vocals_reverb): |
| os.remove(vocals_reverb) |
|
|
| return vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals |
|
|
|
|
| def get_audio_paths(song_dir): |
| """ |
| Search the given directory for expected audio files. |
| Returns: |
| orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path |
| """ |
| orig_song_path = None |
| instrumentals_path = None |
| main_vocals_dereverb_path = None |
| backup_vocals_path = None |
|
|
| for file in os.listdir(song_dir): |
| if file.endswith('_Instrumental.wav'): |
| instrumentals_path = os.path.join(song_dir, file) |
| orig_song_path = instrumentals_path.replace('_Instrumental', '') |
| elif file.endswith('_Vocals_Main_DeReverb.wav'): |
| main_vocals_dereverb_path = os.path.join(song_dir, file) |
| elif file.endswith('_Vocals_Backup.wav'): |
| backup_vocals_path = os.path.join(song_dir, file) |
|
|
| return orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path |
|
|
|
|
| def convert_to_stereo(audio_path): |
| """ |
| Convert the given audio file to stereo (2 channels) if it is mono. |
| """ |
| wave, sr = librosa.load(audio_path, mono=False, sr=44100) |
| if wave.ndim == 1: |
| stereo_path = f'{os.path.splitext(audio_path)[0]}_stereo.wav' |
| command = shlex.split(f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 2 -f wav "{stereo_path}"') |
| subprocess.run(command, check=True) |
| return stereo_path |
| return audio_path |
|
|
|
|
| def pitch_shift(audio_path, pitch_change): |
| """ |
| Shift the pitch of the audio by the specified amount. |
| """ |
| output_path = f'{os.path.splitext(audio_path)[0]}_p{pitch_change}.wav' |
| if not os.path.exists(output_path): |
| y, sr = sf.read(audio_path) |
| tfm = sox.Transformer() |
| tfm.pitch(pitch_change) |
| y_shifted = tfm.build_array(input_array=y, sample_rate_in=sr) |
| sf.write(output_path, y_shifted, sr) |
| return output_path |
|
|
|
|
| def get_hash(filepath): |
| """ |
| Calculate a short BLAKE2b hash for the given file. |
| """ |
| with open(filepath, 'rb') as f: |
| file_hash = hashlib.blake2b() |
| while chunk := f.read(8192): |
| file_hash.update(chunk) |
| return file_hash.hexdigest()[:11] |
|
|
|
|
| def preprocess_song(song_input, song_id, is_webui, input_type, progress): |
| """ |
| Preprocess the input song: |
| - Download if YouTube URL. |
| - Convert to stereo. |
| - Separate vocals and instrumentals. |
| Returns a tuple with six values matching the expected unpacking in the pipeline. |
| """ |
| if input_type == 'yt': |
| display_progress('[~] Downloading song...', 0, is_webui, progress) |
| song_link = song_input.split('&')[0] |
| orig_song_path = yt_download(song_link) |
| elif input_type == 'local': |
| orig_song_path = song_input |
| else: |
| orig_song_path = None |
|
|
| song_output_dir = os.path.join(output_dir, song_id) |
| if not os.path.exists(song_output_dir): |
| os.makedirs(song_output_dir) |
|
|
| orig_song_path = convert_to_stereo(orig_song_path) |
|
|
| display_progress('[~] Separating Vocals from Instrumental...', 0.1, is_webui, progress) |
| vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals = separation_uvr(orig_song_path, song_output_dir) |
| return orig_song_path, vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals, main_vocals_dereverb |
|
|
|
|
| def voice_change(voice_model, vocals_path, output_path, pitch_change, f0_method, |
| index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui): |
| """ |
| Convert the input vocals using the specified RVC model. |
| """ |
| rvc_model_path, rvc_index_path = get_rvc_model(voice_model, is_webui) |
| device = 'cuda:0' |
| config = Config(device, True) |
| hubert_model = load_hubert(embedder_model="contentvec", embedder_model_custom=None) |
| cpt, version, net_g, tgt_sr, vc = get_vc(device, config.is_half, config, rvc_model_path) |
|
|
| rvc_infer( |
| rvc_index_path, index_rate, vocals_path, output_path, pitch_change, f0_method, |
| cpt, version, net_g, filter_radius, tgt_sr, rms_mix_rate, protect, |
| crepe_hop_length, vc, hubert_model |
| ) |
| del hubert_model, cpt |
| gc.collect() |
|
|
|
|
| def add_audio_effects(audio_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping): |
| """ |
| Apply a chain of audio effects (highpass, compression, reverb) to the input audio. |
| """ |
| output_path = f'{os.path.splitext(audio_path)[0]}_mixed.wav' |
| board = Pedalboard([ |
| HighpassFilter(), |
| Compressor(ratio=4, threshold_db=-15), |
| Reverb(room_size=reverb_rm_size, dry_level=reverb_dry, wet_level=reverb_wet, damping=reverb_damping) |
| ]) |
|
|
| with AudioFile(audio_path) as f: |
| with AudioFile(output_path, 'w', f.samplerate, f.num_channels) as o: |
| while f.tell() < f.frames: |
| chunk = f.read(int(f.samplerate)) |
| effected = board(chunk, f.samplerate, reset=False) |
| o.write(effected) |
| return output_path |
|
|
|
|
| def combine_audio(audio_paths, output_path, main_gain, backup_gain, inst_gain, output_format): |
| """ |
| Combine main vocals, backup vocals, and instrumental audio into a final mix. |
| """ |
| main_vocal_audio = AudioSegment.from_wav(audio_paths[0]) - 4 + main_gain |
| backup_vocal_audio = AudioSegment.from_wav(audio_paths[1]) - 6 + backup_gain |
| instrumental_audio = AudioSegment.from_wav(audio_paths[2]) - 7 + inst_gain |
| final_audio = main_vocal_audio.overlay(backup_vocal_audio).overlay(instrumental_audio) |
| final_audio.export(output_path, format=output_format) |
|
|
|
|
| def song_cover_pipeline(song_input, voice_model, pitch_change, keep_files, |
| is_webui=0, main_gain=0, backup_gain=0, inst_gain=0, index_rate=0.5, filter_radius=3, |
| rms_mix_rate=0.25, f0_method='rmvpe', crepe_hop_length=128, protect=0.33, pitch_change_all=0, |
| reverb_rm_size=0.15, reverb_wet=0.2, reverb_dry=0.8, reverb_damping=0.7, output_format='mp3', |
| progress=gr.Progress()): |
| """ |
| Main pipeline that orchestrates the AI cover song generation. |
| """ |
| try: |
| if not song_input or not voice_model: |
| raise_exception('Ensure that the song input field and voice model field is filled.', is_webui) |
|
|
| display_progress('[~] Starting AI Cover Generation Pipeline...', 0, is_webui, progress) |
|
|
| if urlparse(song_input).scheme == 'https': |
| input_type = 'yt' |
| song_id = get_youtube_video_id(song_input) |
| if song_id is None: |
| raise_exception('Invalid YouTube url.', is_webui) |
| else: |
| input_type = 'local' |
| song_input = song_input.strip('\"') |
| if os.path.exists(song_input): |
| song_id = get_hash(song_input) |
| else: |
| raise_exception(f'{song_input} does not exist.', is_webui) |
|
|
| song_dir = os.path.join(output_dir, song_id) |
|
|
| if not os.path.exists(song_dir): |
| os.makedirs(song_dir) |
| (orig_song_path, vocals_path, instrumentals_path, |
| main_vocals_path, backup_vocals_path, main_vocals_dereverb_path) = preprocess_song( |
| song_input, song_id, is_webui, input_type, progress |
| ) |
| else: |
| vocals_path, main_vocals_path = None, None |
| paths = get_audio_paths(song_dir) |
| if any(path is None for path in paths) or keep_files: |
| (orig_song_path, vocals_path, instrumentals_path, |
| main_vocals_path, backup_vocals_path, main_vocals_dereverb_path) = preprocess_song( |
| song_input, song_id, is_webui, input_type, progress |
| ) |
| else: |
| orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path = paths |
| main_vocals_path = main_vocals_dereverb_path |
|
|
| pitch_change += pitch_change_all |
|
|
| base_song_name = os.path.splitext(os.path.basename(orig_song_path))[0] |
| algo_suffix = f"_{crepe_hop_length}" if f0_method == "mangio-crepe" else "" |
| ai_vocals_path = os.path.join( |
| song_dir, |
| f'{base_song_name}_lead_{voice_model}_p{pitch_change}_i{index_rate}_fr{filter_radius}_' |
| f'rms{rms_mix_rate}_pro{protect}_{f0_method}{algo_suffix}.wav' |
| ) |
| ai_backing_path = os.path.join( |
| song_dir, |
| f'{base_song_name}_backing_{voice_model}_p{pitch_change}_i{index_rate}_fr{filter_radius}_' |
| f'rms{rms_mix_rate}_pro{protect}_{f0_method}{algo_suffix}.wav' |
| ) |
| ai_cover_path = os.path.join(song_dir, f'{base_song_name} ({voice_model} Ver).{output_format}') |
| ai_cover_backing_path = os.path.join(song_dir, f'{base_song_name} ({voice_model} Ver With Backing).{output_format}') |
|
|
| if not os.path.exists(ai_vocals_path): |
| display_progress('[~] Converting lead voice using RVC...', 0.5, is_webui, progress) |
| voice_change(voice_model, main_vocals_dereverb_path, ai_vocals_path, pitch_change, |
| f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui) |
|
|
| display_progress('[~] Converting backing voice using RVC...', 0.65, is_webui, progress) |
| voice_change(voice_model, backup_vocals_path, ai_backing_path, pitch_change, |
| f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui) |
|
|
| display_progress('[~] Applying audio effects to Vocals...', 0.8, is_webui, progress) |
| ai_vocals_mixed_path = add_audio_effects(ai_vocals_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping) |
| ai_backing_mixed_path = add_audio_effects(ai_backing_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping) |
|
|
| if pitch_change_all != 0: |
| display_progress('[~] Applying overall pitch change', 0.85, is_webui, progress) |
| instrumentals_path = pitch_shift(instrumentals_path, pitch_change_all) |
| backup_vocals_path = pitch_shift(backup_vocals_path, pitch_change_all) |
|
|
| display_progress('[~] Combining AI Vocals and Instrumentals...', 0.9, is_webui, progress) |
| combine_audio([ai_vocals_mixed_path, backup_vocals_path, instrumentals_path], |
| ai_cover_path, main_gain, backup_gain, inst_gain, output_format) |
| combine_audio([ai_vocals_mixed_path, ai_backing_mixed_path, instrumentals_path], |
| ai_cover_backing_path, main_gain, backup_gain, inst_gain, output_format) |
|
|
| if not keep_files: |
| display_progress('[~] Removing intermediate audio files...', 0.95, is_webui, progress) |
| intermediate_files = [vocals_path, main_vocals_path, ai_vocals_mixed_path, ai_backing_mixed_path] |
| if pitch_change_all != 0: |
| intermediate_files += [instrumentals_path, backup_vocals_path] |
| for file in intermediate_files: |
| if file and os.path.exists(file): |
| os.remove(file) |
|
|
| return ai_cover_path, ai_cover_backing_path |
|
|
| except Exception as e: |
| raise_exception(str(e), is_webui) |
|
|
|
|
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser( |
| description='AICoverGen: Mod.', |
| add_help=True |
| ) |
| parser.add_argument('-i', '--song-input', type=str, required=True, |
| help='Link to a YouTube video or the filepath to a local mp3/wav file to create an AI cover of') |
| parser.add_argument('-dir', '--rvc-dirname', type=str, required=True, |
| help='Name of the folder in the rvc_models directory containing the RVC model file and optional index file to use') |
| parser.add_argument('-p', '--pitch-change', type=int, required=True, |
| help='Change the pitch of AI Vocals only. Generally, use 1 for male to female and -1 for vice-versa. (Octaves)') |
| parser.add_argument('-k', '--keep-files', action=argparse.BooleanOptionalAction, |
| help='Whether to keep all intermediate audio files generated in the song_output/id directory, e.g. Isolated Vocals/Instrumentals') |
| parser.add_argument('-ir', '--index-rate', type=float, default=0.5, |
| help='A decimal number e.g. 0.5, used to reduce/resolve the timbre leakage problem. If set to 1, more biased towards the timbre quality of the training dataset') |
| parser.add_argument('-fr', '--filter-radius', type=int, default=3, |
| help='A number between 0 and 7. If >=3: apply median filtering to the harvested pitch results. The value represents the filter radius and can reduce breathiness.') |
| parser.add_argument('-rms', '--rms-mix-rate', type=float, default=0.25, |
| help="A decimal number e.g. 0.25. Control how much to use the original vocal's loudness (0) or a fixed loudness (1).") |
| parser.add_argument('-palgo', '--pitch-detection-algo', type=str, default='rmvpe', |
| help='Best option is rmvpe (clarity in vocals), then mangio-crepe (smoother vocals).') |
| parser.add_argument('-hop', '--crepe-hop-length', type=int, default=128, |
| help='If pitch detection algo is mangio-crepe, controls how often it checks for pitch changes in milliseconds. Recommended: 128.') |
| parser.add_argument('-pro', '--protect', type=float, default=0.33, |
| help='A decimal number e.g. 0.33. Protect voiceless consonants and breath sounds to prevent artifacts such as tearing in electronic music.') |
| parser.add_argument('-mv', '--main-vol', type=int, default=0, |
| help='Volume change for AI main vocals in decibels. Use -3 to decrease by 3 dB and 3 to increase by 3 dB') |
| parser.add_argument('-bv', '--backup-vol', type=int, default=0, |
| help='Volume change for backup vocals in decibels') |
| parser.add_argument('-iv', '--inst-vol', type=int, default=0, |
| help='Volume change for instrumentals in decibels') |
| parser.add_argument('-pall', '--pitch-change-all', type=int, default=0, |
| help='Change the pitch/key of vocals and instrumentals. Changing this slightly reduces sound quality') |
| parser.add_argument('-rsize', '--reverb-size', type=float, default=0.15, |
| help='Reverb room size between 0 and 1') |
| parser.add_argument('-rwet', '--reverb-wetness', type=float, default=0.2, |
| help='Reverb wet level between 0 and 1') |
| parser.add_argument('-rdry', '--reverb-dryness', type=float, default=0.8, |
| help='Reverb dry level between 0 and 1') |
| parser.add_argument('-rdamp', '--reverb-damping', type=float, default=0.7, |
| help='Reverb damping between 0 and 1') |
| parser.add_argument('-oformat', '--output-format', type=str, default='mp3', |
| help='Output format of audio file. mp3 for smaller file size, wav for best quality') |
| args = parser.parse_args() |
|
|
| rvc_dir = os.path.join(rvc_models_dir, args.rvc_dirname) |
| if not os.path.exists(rvc_dir): |
| raise Exception(f'The folder {rvc_dir} does not exist.') |
|
|
| cover_path, cover_with_backing = song_cover_pipeline( |
| args.song_input, args.rvc_dirname, args.pitch_change, args.keep_files, |
| main_gain=args.main_vol, backup_gain=args.backup_vol, inst_gain=args.inst_vol, |
| index_rate=args.index_rate, filter_radius=args.filter_radius, |
| rms_mix_rate=args.rms_mix_rate, f0_method=args.pitch_detection_algo, |
| crepe_hop_length=args.crepe_hop_length, protect=args.protect, |
| pitch_change_all=args.pitch_change_all, |
| reverb_rm_size=args.reverb_size, reverb_wet=args.reverb_wetness, |
| reverb_dry=args.reverb_dryness, reverb_damping=args.reverb_damping, |
| output_format=args.output_format |
| ) |
| print(f'[+] Cover generated at {cover_path}') |