| import torch
|
| import numpy as np
|
| import yaml
|
| import os
|
|
|
|
|
| def load_yaml_with_includes(yaml_file):
|
| def loader_with_include(loader, node):
|
|
|
| include_path = os.path.join(os.path.dirname(yaml_file), loader.construct_scalar(node))
|
| with open(include_path, 'r') as f:
|
| return yaml.load(f, Loader=yaml.FullLoader)
|
|
|
| yaml.add_constructor('!include', loader_with_include, Loader=yaml.FullLoader)
|
|
|
| with open(yaml_file, 'r') as f:
|
| return yaml.load(f, Loader=yaml.FullLoader)
|
|
|
|
|
| def scale_shift(x, scale, shift):
|
| return (x+shift) * scale
|
|
|
|
|
| def scale_shift_re(x, scale, shift):
|
| return (x/scale) - shift
|
|
|
|
|
| def align_seq(source, target_length, mapping_method='hard'):
|
| source_len = source.shape[1]
|
| if mapping_method == 'hard':
|
| mapping_idx = np.round(np.arange(target_length) * source_len / target_length)
|
| output = source[:, mapping_idx]
|
| else:
|
|
|
| raise NotImplementedError
|
|
|
| return output
|
|
|
|
|
| def customized_lr_scheduler(optimizer, warmup_steps=-1):
|
| from torch.optim.lr_scheduler import LambdaLR
|
|
|
| def fn(step):
|
| if warmup_steps > 0:
|
| return min(step / warmup_steps, 1)
|
| else:
|
| return 1
|
| return LambdaLR(optimizer, fn)
|
|
|
|
|
| def get_lr_scheduler(optimizer, name, **kwargs):
|
| if name == 'customized':
|
| return customized_lr_scheduler(optimizer, **kwargs)
|
| elif name == 'cosine':
|
| from torch.optim.lr_scheduler import CosineAnnealingLR
|
| return CosineAnnealingLR(optimizer, **kwargs)
|
| else:
|
| raise NotImplementedError(name)
|
|
|
|
|
| def compute_snr(noise_scheduler, timesteps):
|
| """
|
| Computes SNR as per
|
| https://github.com/TiankaiHang/Min-SNR-Diffusion
|
| Training/blob/521b624bd70c67cee4bdf49225915f5945a872e3/guided_diffusion/gaussian_diffusion.py#L847-L849
|
| """
|
| alphas_cumprod = noise_scheduler.alphas_cumprod
|
| sqrt_alphas_cumprod = alphas_cumprod**0.5
|
| sqrt_one_minus_alphas_cumprod = (1.0 - alphas_cumprod) ** 0.5
|
|
|
|
|
|
|
|
|
| sqrt_alphas_cumprod = sqrt_alphas_cumprod.to(device=timesteps.device)[timesteps].float()
|
| while len(sqrt_alphas_cumprod.shape) < len(timesteps.shape):
|
| sqrt_alphas_cumprod = sqrt_alphas_cumprod[..., None]
|
| alpha = sqrt_alphas_cumprod.expand(timesteps.shape)
|
|
|
| sqrt_one_minus_alphas_cumprod = sqrt_one_minus_alphas_cumprod.to(device=timesteps.device)[timesteps].float()
|
| while len(sqrt_one_minus_alphas_cumprod.shape) < len(timesteps.shape):
|
| sqrt_one_minus_alphas_cumprod = sqrt_one_minus_alphas_cumprod[..., None]
|
| sigma = sqrt_one_minus_alphas_cumprod.expand(timesteps.shape)
|
|
|
|
|
| snr = (alpha / sigma) ** 2
|
| return snr
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| a = torch.rand(2, 10)
|
| target_len = 15
|
|
|
| b = align_seq(a, target_len) |