| |
| |
| |
| |
|
|
| import math |
| import random |
| import os |
| import json |
|
|
| import numpy as np |
| import parselmouth |
| import torch |
| import torchaudio |
| from tqdm import tqdm |
|
|
| from audiomentations import TimeStretch |
|
|
| from pedalboard import ( |
| Pedalboard, |
| HighShelfFilter, |
| LowShelfFilter, |
| PeakFilter, |
| PitchShift, |
| ) |
|
|
| from utils.util import has_existed |
|
|
| PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT = 0.0 |
| PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT = 1.0 |
| PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT = 1.0 |
| PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT = 1.0 |
| PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT = 1.0 |
|
|
|
|
| def wav_to_Sound(wav, sr: int) -> parselmouth.Sound: |
| """Convert a waveform to a parselmouth.Sound object |
| |
| Args: |
| wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) |
| sr (int, optional): sampling rate. |
| |
| Returns: |
| parselmouth.Sound: a parselmouth.Sound object |
| """ |
| assert wav.shape == (1, len(wav[0])), "wav must be of shape (1, n_samples)" |
| sound = None |
| if isinstance(wav, np.ndarray): |
| sound = parselmouth.Sound(wav[0], sampling_frequency=sr) |
| elif isinstance(wav, torch.Tensor): |
| sound = parselmouth.Sound(wav[0].numpy(), sampling_frequency=sr) |
| assert sound is not None, "wav must be either np.ndarray or torch.Tensor" |
| return sound |
|
|
|
|
| def get_pitch_median(wav, sr: int): |
| """Get the median pitch of a waveform |
| |
| Args: |
| wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) |
| sr (int, optional): sampling rate. |
| |
| Returns: |
| parselmouth.Pitch, float: a parselmouth.Pitch object and the median pitch |
| """ |
| if not isinstance(wav, parselmouth.Sound): |
| sound = wav_to_Sound(wav, sr) |
| else: |
| sound = wav |
| pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
|
|
| |
| pitch = parselmouth.praat.call(sound, "To Pitch", 0.8 / 75, 75, 600) |
| |
| pitch_median = parselmouth.praat.call(pitch, "Get quantile", 0.0, 0.0, 0.5, "Hertz") |
|
|
| return pitch, pitch_median |
|
|
|
|
| def change_gender( |
| sound, |
| pitch=None, |
| formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, |
| new_pitch_median: float = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT, |
| pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, |
| duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, |
| ) -> parselmouth.Sound: |
| """Invoke change gender function in praat |
| |
| Args: |
| sound (parselmouth.Sound): a parselmouth.Sound object |
| pitch (parselmouth.Pitch, optional): a parselmouth.Pitch object. Defaults to None. |
| formant_shift_ratio (float, optional): formant shift ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch. Less than 1.0 means lower pitch. |
| new_pitch_median (float, optional): new pitch median. |
| pitch_range_ratio (float, optional): pitch range ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch range. Less than 1.0 means lower pitch range. |
| duration_factor (float, optional): duration factor. A value of 1.0 means no change. Greater than 1.0 means longer duration. Less than 1.0 means shorter duration. |
| |
| Returns: |
| parselmouth.Sound: a parselmouth.Sound object |
| """ |
| if pitch is None: |
| new_sound = parselmouth.praat.call( |
| sound, |
| "Change gender", |
| 75, |
| 600, |
| formant_shift_ratio, |
| new_pitch_median, |
| pitch_range_ratio, |
| duration_factor, |
| ) |
| else: |
| new_sound = parselmouth.praat.call( |
| (sound, pitch), |
| "Change gender", |
| formant_shift_ratio, |
| new_pitch_median, |
| pitch_range_ratio, |
| duration_factor, |
| ) |
| return new_sound |
|
|
|
|
| def apply_formant_and_pitch_shift( |
| sound: parselmouth.Sound, |
| formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, |
| pitch_shift_ratio: float = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT, |
| pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, |
| duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, |
| ) -> parselmouth.Sound: |
| """use Praat "Changer gender" command to manipulate pitch and formant |
| "Change gender": Praat -> Sound Object -> Convert -> Change gender |
| refer to Help of Praat for more details |
| # https://github.com/YannickJadoul/Parselmouth/issues/25#issuecomment-608632887 might help |
| """ |
| pitch = None |
| new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
| if pitch_shift_ratio != 1.0: |
| pitch, pitch_median = get_pitch_median(sound, sound.sampling_frequency) |
| new_pitch_median = pitch_median * pitch_shift_ratio |
|
|
| |
| pitch_minimum = parselmouth.praat.call( |
| pitch, "Get minimum", 0.0, 0.0, "Hertz", "Parabolic" |
| ) |
| new_median = pitch_median * pitch_shift_ratio |
| scaled_minimum = pitch_minimum * pitch_shift_ratio |
| result_minimum = new_median + (scaled_minimum - new_median) * pitch_range_ratio |
| if result_minimum < 0: |
| new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
| pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT |
|
|
| if math.isnan(new_pitch_median): |
| new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
| pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT |
|
|
| new_sound = change_gender( |
| sound, |
| pitch, |
| formant_shift_ratio, |
| new_pitch_median, |
| pitch_range_ratio, |
| duration_factor, |
| ) |
| return new_sound |
|
|
|
|
| |
| def pedalboard_equalizer(wav: np.ndarray, sr: int) -> np.ndarray: |
| """Use pedalboard to do equalizer""" |
| board = Pedalboard() |
|
|
| cutoff_low_freq = 60 |
| cutoff_high_freq = 10000 |
|
|
| q_min = 2 |
| q_max = 5 |
|
|
| random_all_freq = True |
| num_filters = 10 |
| if random_all_freq: |
| key_freqs = [random.uniform(1, 12000) for _ in range(num_filters)] |
| else: |
| key_freqs = [ |
| power_ratio(float(z) / (num_filters - 1), cutoff_low_freq, cutoff_high_freq) |
| for z in range(num_filters) |
| ] |
| q_values = [ |
| power_ratio(random.uniform(0, 1), q_min, q_max) for _ in range(num_filters) |
| ] |
| gains = [random.uniform(-12, 12) for _ in range(num_filters)] |
| |
| board.append( |
| LowShelfFilter( |
| cutoff_frequency_hz=key_freqs[0], gain_db=gains[0], q=q_values[0] |
| ) |
| ) |
| |
| for i in range(1, 9): |
| board.append( |
| PeakFilter( |
| cutoff_frequency_hz=key_freqs[i], gain_db=gains[i], q=q_values[i] |
| ) |
| ) |
| |
| board.append( |
| HighShelfFilter( |
| cutoff_frequency_hz=key_freqs[9], gain_db=gains[9], q=q_values[9] |
| ) |
| ) |
|
|
| |
| processed_audio = board(wav, sr) |
| return processed_audio |
|
|
|
|
| def power_ratio(r: float, a: float, b: float): |
| return a * math.pow((b / a), r) |
|
|
|
|
| def audiomentations_time_stretch(wav: np.ndarray, sr: int) -> np.ndarray: |
| """Use audiomentations to do time stretch""" |
| transform = TimeStretch( |
| min_rate=0.8, max_rate=1.25, leave_length_unchanged=False, p=1.0 |
| ) |
| augmented_wav = transform(wav, sample_rate=sr) |
| return augmented_wav |
|
|
|
|
| def formant_and_pitch_shift( |
| sound: parselmouth.Sound, fs: bool, ps: bool |
| ) -> parselmouth.Sound: |
| """ """ |
| formant_shift_ratio = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT |
| pitch_shift_ratio = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT |
| pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT |
|
|
| assert fs != ps, "fs, ps are mutually exclusive" |
|
|
| if fs: |
| formant_shift_ratio = random.uniform(1.0, 1.4) |
| use_reciprocal = random.uniform(-1, 1) > 0 |
| if use_reciprocal: |
| formant_shift_ratio = 1.0 / formant_shift_ratio |
| |
| new_sound = apply_formant_and_pitch_shift( |
| sound, |
| formant_shift_ratio=formant_shift_ratio, |
| ) |
| return new_sound |
|
|
| if ps: |
| board = Pedalboard() |
| board.append(PitchShift(random.uniform(-12, 12))) |
| wav_numpy = sound.values |
| wav_numpy = board(wav_numpy, sound.sampling_frequency) |
| |
| new_sound = parselmouth.Sound( |
| wav_numpy, sampling_frequency=sound.sampling_frequency |
| ) |
| return new_sound |
|
|
|
|
| def wav_manipulation( |
| wav: torch.Tensor, |
| sr: int, |
| aug_type: str = "None", |
| formant_shift: bool = False, |
| pitch_shift: bool = False, |
| time_stretch: bool = False, |
| equalizer: bool = False, |
| ) -> torch.Tensor: |
| assert aug_type == "None" or aug_type in [ |
| "formant_shift", |
| "pitch_shift", |
| "time_stretch", |
| "equalizer", |
| ], "aug_type must be one of formant_shift, pitch_shift, time_stretch, equalizer" |
|
|
| assert aug_type == "None" or ( |
| formant_shift == False |
| and pitch_shift == False |
| and time_stretch == False |
| and equalizer == False |
| ), "if aug_type is specified, other argument must be False" |
|
|
| if aug_type != "None": |
| if aug_type == "formant_shift": |
| formant_shift = True |
| if aug_type == "pitch_shift": |
| pitch_shift = True |
| if aug_type == "equalizer": |
| equalizer = True |
| if aug_type == "time_stretch": |
| time_stretch = True |
|
|
| wav_numpy = wav.numpy() |
|
|
| if equalizer: |
| wav_numpy = pedalboard_equalizer(wav_numpy, sr) |
|
|
| if time_stretch: |
| wav_numpy = audiomentations_time_stretch(wav_numpy, sr) |
|
|
| sound = wav_to_Sound(wav_numpy, sr) |
|
|
| if formant_shift or pitch_shift: |
| sound = formant_and_pitch_shift(sound, formant_shift, pitch_shift) |
|
|
| wav = torch.from_numpy(sound.values).float() |
| |
| return wav |
|
|
|
|
| def augment_dataset(cfg, dataset) -> list: |
| """Augment dataset with formant_shift, pitch_shift, time_stretch, equalizer |
| |
| Args: |
| cfg (dict): configuration |
| dataset (str): dataset name |
| |
| Returns: |
| list: augmented dataset names |
| """ |
| |
| dataset_path = os.path.join(cfg.preprocess.processed_dir, dataset) |
| split = ["train", "test"] if "eval" not in dataset else ["test"] |
| augment_datasets = [] |
| aug_types = [ |
| "formant_shift" if cfg.preprocess.use_formant_shift else None, |
| "pitch_shift" if cfg.preprocess.use_pitch_shift else None, |
| "time_stretch" if cfg.preprocess.use_time_stretch else None, |
| "equalizer" if cfg.preprocess.use_equalizer else None, |
| ] |
| aug_types = filter(None, aug_types) |
| for aug_type in aug_types: |
| print("Augmenting {} with {}...".format(dataset, aug_type)) |
| new_dataset = dataset + "_" + aug_type |
| augment_datasets.append(new_dataset) |
| new_dataset_path = os.path.join(cfg.preprocess.processed_dir, new_dataset) |
|
|
| for dataset_type in split: |
| metadata_path = os.path.join(dataset_path, "{}.json".format(dataset_type)) |
| augmented_metadata = [] |
| new_metadata_path = os.path.join( |
| new_dataset_path, "{}.json".format(dataset_type) |
| ) |
| os.makedirs(new_dataset_path, exist_ok=True) |
| new_dataset_wav_dir = os.path.join(new_dataset_path, "wav") |
| os.makedirs(new_dataset_wav_dir, exist_ok=True) |
|
|
| if has_existed(new_metadata_path): |
| continue |
|
|
| with open(metadata_path, "r") as f: |
| metadata = json.load(f) |
|
|
| for utt in tqdm(metadata): |
| original_wav_path = utt["Path"] |
| original_wav, sr = torchaudio.load(original_wav_path) |
| new_wav = wav_manipulation(original_wav, sr, aug_type=aug_type) |
| new_wav_path = os.path.join(new_dataset_wav_dir, utt["Uid"] + ".wav") |
| torchaudio.save(new_wav_path, new_wav, sr) |
| new_utt = { |
| "Dataset": utt["Dataset"] + "_" + aug_type, |
| "index": utt["index"], |
| "Singer": utt["Singer"], |
| "Uid": utt["Uid"], |
| "Path": new_wav_path, |
| "Duration": utt["Duration"], |
| } |
| augmented_metadata.append(new_utt) |
| new_metadata_path = os.path.join( |
| new_dataset_path, "{}.json".format(dataset_type) |
| ) |
| with open(new_metadata_path, "w") as f: |
| json.dump(augmented_metadata, f, indent=4, ensure_ascii=False) |
| return augment_datasets |
|
|