| import torch
|
| import argparse
|
| import random
|
| import re
|
| from typing import List, Optional, Union
|
| from .utils import setup_logging
|
|
|
| setup_logging()
|
| import logging
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| def prepare_scheduler_for_custom_training(noise_scheduler, device):
|
| if hasattr(noise_scheduler, "all_snr"):
|
| return
|
|
|
| alphas_cumprod = noise_scheduler.alphas_cumprod
|
| sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
|
| sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - alphas_cumprod)
|
| alpha = sqrt_alphas_cumprod
|
| sigma = sqrt_one_minus_alphas_cumprod
|
| all_snr = (alpha / sigma) ** 2
|
|
|
| noise_scheduler.all_snr = all_snr.to(device)
|
|
|
|
|
| def fix_noise_scheduler_betas_for_zero_terminal_snr(noise_scheduler):
|
|
|
| logger.info(f"fix noise scheduler betas: https://arxiv.org/abs/2305.08891")
|
|
|
| def enforce_zero_terminal_snr(betas):
|
|
|
| alphas = 1 - betas
|
| alphas_bar = alphas.cumprod(0)
|
| alphas_bar_sqrt = alphas_bar.sqrt()
|
|
|
|
|
| alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone()
|
| alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone()
|
|
|
| alphas_bar_sqrt -= alphas_bar_sqrt_T
|
|
|
| alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T)
|
|
|
|
|
| alphas_bar = alphas_bar_sqrt**2
|
| alphas = alphas_bar[1:] / alphas_bar[:-1]
|
| alphas = torch.cat([alphas_bar[0:1], alphas])
|
| betas = 1 - alphas
|
| return betas
|
|
|
| betas = noise_scheduler.betas
|
| betas = enforce_zero_terminal_snr(betas)
|
| alphas = 1.0 - betas
|
| alphas_cumprod = torch.cumprod(alphas, dim=0)
|
|
|
|
|
|
|
|
|
| noise_scheduler.betas = betas
|
| noise_scheduler.alphas = alphas
|
| noise_scheduler.alphas_cumprod = alphas_cumprod
|
|
|
|
|
| def apply_snr_weight(loss, timesteps, noise_scheduler, gamma, v_prediction=False):
|
| snr = torch.stack([noise_scheduler.all_snr[t] for t in timesteps])
|
| min_snr_gamma = torch.minimum(snr, torch.full_like(snr, gamma))
|
| if v_prediction:
|
| snr_weight = torch.div(min_snr_gamma, snr + 1).float().to(loss.device)
|
| else:
|
| snr_weight = torch.div(min_snr_gamma, snr).float().to(loss.device)
|
| loss = loss * snr_weight
|
| return loss
|
|
|
|
|
| def scale_v_prediction_loss_like_noise_prediction(loss, timesteps, noise_scheduler):
|
| scale = get_snr_scale(timesteps, noise_scheduler)
|
| loss = loss * scale
|
| return loss
|
|
|
|
|
| def get_snr_scale(timesteps, noise_scheduler):
|
| snr_t = torch.stack([noise_scheduler.all_snr[t] for t in timesteps])
|
| snr_t = torch.minimum(snr_t, torch.ones_like(snr_t) * 1000)
|
| scale = snr_t / (snr_t + 1)
|
|
|
|
|
| return scale
|
|
|
|
|
| def add_v_prediction_like_loss(loss, timesteps, noise_scheduler, v_pred_like_loss):
|
| scale = get_snr_scale(timesteps, noise_scheduler)
|
|
|
| loss = loss + loss / scale * v_pred_like_loss
|
| return loss
|
|
|
|
|
| def apply_debiased_estimation(loss, timesteps, noise_scheduler, v_prediction=False):
|
| snr_t = torch.stack([noise_scheduler.all_snr[t] for t in timesteps])
|
| snr_t = torch.minimum(snr_t, torch.ones_like(snr_t) * 1000)
|
| if v_prediction:
|
| weight = 1 / (snr_t + 1)
|
| else:
|
| weight = 1 / torch.sqrt(snr_t)
|
| loss = weight * loss
|
| return loss
|
|
|
|
|
|
|
|
|
|
|
| def add_custom_train_arguments(parser: argparse.ArgumentParser, support_weighted_captions: bool = True):
|
| parser.add_argument(
|
| "--min_snr_gamma",
|
| type=float,
|
| default=None,
|
| help="gamma for reducing the weight of high loss timesteps. Lower numbers have stronger effect. 5 is recommended by paper. / 低いタイムステップでの高いlossに対して重みを減らすためのgamma値、低いほど効果が強く、論文では5が推奨",
|
| )
|
| parser.add_argument(
|
| "--scale_v_pred_loss_like_noise_pred",
|
| action="store_true",
|
| help="scale v-prediction loss like noise prediction loss / v-prediction lossをnoise prediction lossと同じようにスケーリングする",
|
| )
|
| parser.add_argument(
|
| "--v_pred_like_loss",
|
| type=float,
|
| default=None,
|
| help="add v-prediction like loss multiplied by this value / v-prediction lossをこの値をかけたものをlossに加算する",
|
| )
|
| parser.add_argument(
|
| "--debiased_estimation_loss",
|
| action="store_true",
|
| help="debiased estimation loss / debiased estimation loss",
|
| )
|
| if support_weighted_captions:
|
| parser.add_argument(
|
| "--weighted_captions",
|
| action="store_true",
|
| default=False,
|
| help="Enable weighted captions in the standard style (token:1.3). No commas inside parens, or shuffle/dropout may break the decoder. / 「[token]」、「(token)」「(token:1.3)」のような重み付きキャプションを有効にする。カンマを括弧内に入れるとシャッフルやdropoutで重みづけがおかしくなるので注意",
|
| )
|
|
|
|
|
| re_attention = re.compile(
|
| r"""
|
| \\\(|
|
| \\\)|
|
| \\\[|
|
| \\]|
|
| \\\\|
|
| \\|
|
| \(|
|
| \[|
|
| :([+-]?[.\d]+)\)|
|
| \)|
|
| ]|
|
| [^\\()\[\]:]+|
|
| :
|
| """,
|
| re.X,
|
| )
|
|
|
|
|
| def parse_prompt_attention(text):
|
| """
|
| Parses a string with attention tokens and returns a list of pairs: text and its associated weight.
|
| Accepted tokens are:
|
| (abc) - increases attention to abc by a multiplier of 1.1
|
| (abc:3.12) - increases attention to abc by a multiplier of 3.12
|
| [abc] - decreases attention to abc by a multiplier of 1.1
|
| \( - literal character '('
|
| \[ - literal character '['
|
| \) - literal character ')'
|
| \] - literal character ']'
|
| \\ - literal character '\'
|
| anything else - just text
|
| >>> parse_prompt_attention('normal text')
|
| [['normal text', 1.0]]
|
| >>> parse_prompt_attention('an (important) word')
|
| [['an ', 1.0], ['important', 1.1], [' word', 1.0]]
|
| >>> parse_prompt_attention('(unbalanced')
|
| [['unbalanced', 1.1]]
|
| >>> parse_prompt_attention('\(literal\]')
|
| [['(literal]', 1.0]]
|
| >>> parse_prompt_attention('(unnecessary)(parens)')
|
| [['unnecessaryparens', 1.1]]
|
| >>> parse_prompt_attention('a (((house:1.3)) [on] a (hill:0.5), sun, (((sky))).')
|
| [['a ', 1.0],
|
| ['house', 1.5730000000000004],
|
| [' ', 1.1],
|
| ['on', 1.0],
|
| [' a ', 1.1],
|
| ['hill', 0.55],
|
| [', sun, ', 1.1],
|
| ['sky', 1.4641000000000006],
|
| ['.', 1.1]]
|
| """
|
|
|
| res = []
|
| round_brackets = []
|
| square_brackets = []
|
|
|
| round_bracket_multiplier = 1.1
|
| square_bracket_multiplier = 1 / 1.1
|
|
|
| def multiply_range(start_position, multiplier):
|
| for p in range(start_position, len(res)):
|
| res[p][1] *= multiplier
|
|
|
| for m in re_attention.finditer(text):
|
| text = m.group(0)
|
| weight = m.group(1)
|
|
|
| if text.startswith("\\"):
|
| res.append([text[1:], 1.0])
|
| elif text == "(":
|
| round_brackets.append(len(res))
|
| elif text == "[":
|
| square_brackets.append(len(res))
|
| elif weight is not None and len(round_brackets) > 0:
|
| multiply_range(round_brackets.pop(), float(weight))
|
| elif text == ")" and len(round_brackets) > 0:
|
| multiply_range(round_brackets.pop(), round_bracket_multiplier)
|
| elif text == "]" and len(square_brackets) > 0:
|
| multiply_range(square_brackets.pop(), square_bracket_multiplier)
|
| else:
|
| res.append([text, 1.0])
|
|
|
| for pos in round_brackets:
|
| multiply_range(pos, round_bracket_multiplier)
|
|
|
| for pos in square_brackets:
|
| multiply_range(pos, square_bracket_multiplier)
|
|
|
| if len(res) == 0:
|
| res = [["", 1.0]]
|
|
|
|
|
| i = 0
|
| while i + 1 < len(res):
|
| if res[i][1] == res[i + 1][1]:
|
| res[i][0] += res[i + 1][0]
|
| res.pop(i + 1)
|
| else:
|
| i += 1
|
|
|
| return res
|
|
|
|
|
| def get_prompts_with_weights(tokenizer, prompt: List[str], max_length: int):
|
| r"""
|
| Tokenize a list of prompts and return its tokens with weights of each token.
|
|
|
| No padding, starting or ending token is included.
|
| """
|
| tokens = []
|
| weights = []
|
| truncated = False
|
| for text in prompt:
|
| texts_and_weights = parse_prompt_attention(text)
|
| text_token = []
|
| text_weight = []
|
| for word, weight in texts_and_weights:
|
|
|
| token = tokenizer(word).input_ids[1:-1]
|
| text_token += token
|
|
|
| text_weight += [weight] * len(token)
|
|
|
| if len(text_token) > max_length:
|
| truncated = True
|
| break
|
|
|
| if len(text_token) > max_length:
|
| truncated = True
|
| text_token = text_token[:max_length]
|
| text_weight = text_weight[:max_length]
|
| tokens.append(text_token)
|
| weights.append(text_weight)
|
| if truncated:
|
| logger.warning("Prompt was truncated. Try to shorten the prompt or increase max_embeddings_multiples")
|
| return tokens, weights
|
|
|
|
|
| def pad_tokens_and_weights(tokens, weights, max_length, bos, eos, no_boseos_middle=True, chunk_length=77):
|
| r"""
|
| Pad the tokens (with starting and ending tokens) and weights (with 1.0) to max_length.
|
| """
|
| max_embeddings_multiples = (max_length - 2) // (chunk_length - 2)
|
| weights_length = max_length if no_boseos_middle else max_embeddings_multiples * chunk_length
|
| for i in range(len(tokens)):
|
| tokens[i] = [bos] + tokens[i] + [eos] * (max_length - 1 - len(tokens[i]))
|
| if no_boseos_middle:
|
| weights[i] = [1.0] + weights[i] + [1.0] * (max_length - 1 - len(weights[i]))
|
| else:
|
| w = []
|
| if len(weights[i]) == 0:
|
| w = [1.0] * weights_length
|
| else:
|
| for j in range(max_embeddings_multiples):
|
| w.append(1.0)
|
| w += weights[i][j * (chunk_length - 2) : min(len(weights[i]), (j + 1) * (chunk_length - 2))]
|
| w.append(1.0)
|
| w += [1.0] * (weights_length - len(w))
|
| weights[i] = w[:]
|
|
|
| return tokens, weights
|
|
|
|
|
| def get_unweighted_text_embeddings(
|
| tokenizer,
|
| text_encoder,
|
| text_input: torch.Tensor,
|
| chunk_length: int,
|
| clip_skip: int,
|
| eos: int,
|
| pad: int,
|
| no_boseos_middle: Optional[bool] = True,
|
| ):
|
| """
|
| When the length of tokens is a multiple of the capacity of the text encoder,
|
| it should be split into chunks and sent to the text encoder individually.
|
| """
|
| max_embeddings_multiples = (text_input.shape[1] - 2) // (chunk_length - 2)
|
| if max_embeddings_multiples > 1:
|
| text_embeddings = []
|
| for i in range(max_embeddings_multiples):
|
|
|
| text_input_chunk = text_input[:, i * (chunk_length - 2) : (i + 1) * (chunk_length - 2) + 2].clone()
|
|
|
|
|
| text_input_chunk[:, 0] = text_input[0, 0]
|
| if pad == eos:
|
| text_input_chunk[:, -1] = text_input[0, -1]
|
| else:
|
| for j in range(len(text_input_chunk)):
|
| if text_input_chunk[j, -1] != eos and text_input_chunk[j, -1] != pad:
|
| text_input_chunk[j, -1] = eos
|
| if text_input_chunk[j, 1] == pad:
|
| text_input_chunk[j, 1] = eos
|
|
|
| if clip_skip is None or clip_skip == 1:
|
| text_embedding = text_encoder(text_input_chunk)[0]
|
| else:
|
| enc_out = text_encoder(text_input_chunk, output_hidden_states=True, return_dict=True)
|
| text_embedding = enc_out["hidden_states"][-clip_skip]
|
| text_embedding = text_encoder.text_model.final_layer_norm(text_embedding)
|
|
|
| if no_boseos_middle:
|
| if i == 0:
|
|
|
| text_embedding = text_embedding[:, :-1]
|
| elif i == max_embeddings_multiples - 1:
|
|
|
| text_embedding = text_embedding[:, 1:]
|
| else:
|
|
|
| text_embedding = text_embedding[:, 1:-1]
|
|
|
| text_embeddings.append(text_embedding)
|
| text_embeddings = torch.concat(text_embeddings, axis=1)
|
| else:
|
| if clip_skip is None or clip_skip == 1:
|
| text_embeddings = text_encoder(text_input)[0]
|
| else:
|
| enc_out = text_encoder(text_input, output_hidden_states=True, return_dict=True)
|
| text_embeddings = enc_out["hidden_states"][-clip_skip]
|
| text_embeddings = text_encoder.text_model.final_layer_norm(text_embeddings)
|
| return text_embeddings
|
|
|
|
|
| def get_weighted_text_embeddings(
|
| tokenizer,
|
| text_encoder,
|
| prompt: Union[str, List[str]],
|
| device,
|
| max_embeddings_multiples: Optional[int] = 3,
|
| no_boseos_middle: Optional[bool] = False,
|
| clip_skip=None,
|
| ):
|
| r"""
|
| Prompts can be assigned with local weights using brackets. For example,
|
| prompt 'A (very beautiful) masterpiece' highlights the words 'very beautiful',
|
| and the embedding tokens corresponding to the words get multiplied by a constant, 1.1.
|
|
|
| Also, to regularize of the embedding, the weighted embedding would be scaled to preserve the original mean.
|
|
|
| Args:
|
| prompt (`str` or `List[str]`):
|
| The prompt or prompts to guide the image generation.
|
| max_embeddings_multiples (`int`, *optional*, defaults to `3`):
|
| The max multiple length of prompt embeddings compared to the max output length of text encoder.
|
| no_boseos_middle (`bool`, *optional*, defaults to `False`):
|
| If the length of text token is multiples of the capacity of text encoder, whether reserve the starting and
|
| ending token in each of the chunk in the middle.
|
| skip_parsing (`bool`, *optional*, defaults to `False`):
|
| Skip the parsing of brackets.
|
| skip_weighting (`bool`, *optional*, defaults to `False`):
|
| Skip the weighting. When the parsing is skipped, it is forced True.
|
| """
|
| max_length = (tokenizer.model_max_length - 2) * max_embeddings_multiples + 2
|
| if isinstance(prompt, str):
|
| prompt = [prompt]
|
|
|
| prompt_tokens, prompt_weights = get_prompts_with_weights(tokenizer, prompt, max_length - 2)
|
|
|
|
|
| max_length = max([len(token) for token in prompt_tokens])
|
|
|
| max_embeddings_multiples = min(
|
| max_embeddings_multiples,
|
| (max_length - 1) // (tokenizer.model_max_length - 2) + 1,
|
| )
|
| max_embeddings_multiples = max(1, max_embeddings_multiples)
|
| max_length = (tokenizer.model_max_length - 2) * max_embeddings_multiples + 2
|
|
|
|
|
| bos = tokenizer.bos_token_id
|
| eos = tokenizer.eos_token_id
|
| pad = tokenizer.pad_token_id
|
| prompt_tokens, prompt_weights = pad_tokens_and_weights(
|
| prompt_tokens,
|
| prompt_weights,
|
| max_length,
|
| bos,
|
| eos,
|
| no_boseos_middle=no_boseos_middle,
|
| chunk_length=tokenizer.model_max_length,
|
| )
|
| prompt_tokens = torch.tensor(prompt_tokens, dtype=torch.long, device=device)
|
|
|
|
|
| text_embeddings = get_unweighted_text_embeddings(
|
| tokenizer,
|
| text_encoder,
|
| prompt_tokens,
|
| tokenizer.model_max_length,
|
| clip_skip,
|
| eos,
|
| pad,
|
| no_boseos_middle=no_boseos_middle,
|
| )
|
| prompt_weights = torch.tensor(prompt_weights, dtype=text_embeddings.dtype, device=device)
|
|
|
|
|
| previous_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype)
|
| text_embeddings = text_embeddings * prompt_weights.unsqueeze(-1)
|
| current_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype)
|
| text_embeddings = text_embeddings * (previous_mean / current_mean).unsqueeze(-1).unsqueeze(-1)
|
|
|
| return text_embeddings
|
|
|
|
|
|
|
| def pyramid_noise_like(noise, device, iterations=6, discount=0.4):
|
| b, c, w, h = noise.shape
|
| u = torch.nn.Upsample(size=(w, h), mode="bilinear").to(device)
|
| for i in range(iterations):
|
| r = random.random() * 2 + 2
|
| wn, hn = max(1, int(w / (r**i))), max(1, int(h / (r**i)))
|
| noise += u(torch.randn(b, c, wn, hn).to(device)) * discount**i
|
| if wn == 1 or hn == 1:
|
| break
|
| return noise / noise.std()
|
|
|
|
|
|
|
| def apply_noise_offset(latents, noise, noise_offset, adaptive_noise_scale):
|
| if noise_offset is None:
|
| return noise
|
| if adaptive_noise_scale is not None:
|
|
|
|
|
| latent_mean = torch.abs(latents.mean(dim=(2, 3), keepdim=True))
|
|
|
|
|
| noise_offset = noise_offset + adaptive_noise_scale * latent_mean
|
| noise_offset = torch.clamp(noise_offset, 0.0, None)
|
|
|
| noise = noise + noise_offset * torch.randn((latents.shape[0], latents.shape[1], 1, 1), device=latents.device)
|
| return noise
|
|
|
|
|
| def apply_masked_loss(loss, batch):
|
| if "conditioning_images" in batch:
|
|
|
| mask_image = batch["conditioning_images"].to(dtype=loss.dtype)[:, 0].unsqueeze(1)
|
| mask_image = mask_image / 2 + 0.5
|
|
|
| elif "alpha_masks" in batch and batch["alpha_masks"] is not None:
|
|
|
| mask_image = batch["alpha_masks"].to(dtype=loss.dtype).unsqueeze(1)
|
|
|
| else:
|
| return loss
|
|
|
|
|
| mask_image = torch.nn.functional.interpolate(mask_image, size=loss.shape[2:], mode="area")
|
| loss = loss * mask_image
|
| return loss
|
|
|
|
|
| """
|
| ##########################################
|
| # Perlin Noise
|
| def rand_perlin_2d(device, shape, res, fade=lambda t: 6 * t**5 - 15 * t**4 + 10 * t**3):
|
| delta = (res[0] / shape[0], res[1] / shape[1])
|
| d = (shape[0] // res[0], shape[1] // res[1])
|
|
|
| grid = (
|
| torch.stack(
|
| torch.meshgrid(torch.arange(0, res[0], delta[0], device=device), torch.arange(0, res[1], delta[1], device=device)),
|
| dim=-1,
|
| )
|
| % 1
|
| )
|
| angles = 2 * torch.pi * torch.rand(res[0] + 1, res[1] + 1, device=device)
|
| gradients = torch.stack((torch.cos(angles), torch.sin(angles)), dim=-1)
|
|
|
| tile_grads = (
|
| lambda slice1, slice2: gradients[slice1[0] : slice1[1], slice2[0] : slice2[1]]
|
| .repeat_interleave(d[0], 0)
|
| .repeat_interleave(d[1], 1)
|
| )
|
| dot = lambda grad, shift: (
|
| torch.stack((grid[: shape[0], : shape[1], 0] + shift[0], grid[: shape[0], : shape[1], 1] + shift[1]), dim=-1)
|
| * grad[: shape[0], : shape[1]]
|
| ).sum(dim=-1)
|
|
|
| n00 = dot(tile_grads([0, -1], [0, -1]), [0, 0])
|
| n10 = dot(tile_grads([1, None], [0, -1]), [-1, 0])
|
| n01 = dot(tile_grads([0, -1], [1, None]), [0, -1])
|
| n11 = dot(tile_grads([1, None], [1, None]), [-1, -1])
|
| t = fade(grid[: shape[0], : shape[1]])
|
| return 1.414 * torch.lerp(torch.lerp(n00, n10, t[..., 0]), torch.lerp(n01, n11, t[..., 0]), t[..., 1])
|
|
|
|
|
| def rand_perlin_2d_octaves(device, shape, res, octaves=1, persistence=0.5):
|
| noise = torch.zeros(shape, device=device)
|
| frequency = 1
|
| amplitude = 1
|
| for _ in range(octaves):
|
| noise += amplitude * rand_perlin_2d(device, shape, (frequency * res[0], frequency * res[1]))
|
| frequency *= 2
|
| amplitude *= persistence
|
| return noise
|
|
|
|
|
| def perlin_noise(noise, device, octaves):
|
| _, c, w, h = noise.shape
|
| perlin = lambda: rand_perlin_2d_octaves(device, (w, h), (4, 4), octaves)
|
| noise_perlin = []
|
| for _ in range(c):
|
| noise_perlin.append(perlin())
|
| noise_perlin = torch.stack(noise_perlin).unsqueeze(0) # (1, c, w, h)
|
| noise += noise_perlin # broadcast for each batch
|
| return noise / noise.std() # Scaled back to roughly unit variance
|
| """
|
|
|