| import inspect |
| import numpy as np |
| |
| from openvino.runtime import Core |
| |
| from transformers import CLIPTokenizer |
| |
| from tqdm import tqdm |
| from huggingface_hub import hf_hub_download |
| from diffusers import LMSDiscreteScheduler, PNDMScheduler |
| import cv2 |
|
|
|
|
| def result(var): |
| return next(iter(var.values())) |
|
|
|
|
| class StableDiffusionEngine: |
| def __init__( |
| self, |
| scheduler, |
| model="bes-dev/stable-diffusion-v1-4-openvino", |
| tokenizer="openai/clip-vit-large-patch14", |
| device="CPU" |
| ): |
| self.tokenizer = CLIPTokenizer.from_pretrained(tokenizer) |
| self.scheduler = scheduler |
| |
| self.core = Core() |
| |
| self._text_encoder = self.core.read_model( |
| hf_hub_download(repo_id=model, filename="text_encoder.xml"), |
| hf_hub_download(repo_id=model, filename="text_encoder.bin") |
| ) |
| self.text_encoder = self.core.compile_model(self._text_encoder, device) |
| |
| self._unet = self.core.read_model( |
| hf_hub_download(repo_id=model, filename="unet.xml"), |
| hf_hub_download(repo_id=model, filename="unet.bin") |
| ) |
| self.unet = self.core.compile_model(self._unet, device) |
| self.latent_shape = tuple(self._unet.inputs[0].shape)[1:] |
| |
| self._vae_decoder = self.core.read_model( |
| hf_hub_download(repo_id=model, filename="vae_decoder.xml"), |
| hf_hub_download(repo_id=model, filename="vae_decoder.bin") |
| ) |
| self.vae_decoder = self.core.compile_model(self._vae_decoder, device) |
| |
| self._vae_encoder = self.core.read_model( |
| hf_hub_download(repo_id=model, filename="vae_encoder.xml"), |
| hf_hub_download(repo_id=model, filename="vae_encoder.bin") |
| ) |
| self.vae_encoder = self.core.compile_model(self._vae_encoder, device) |
| self.init_image_shape = tuple(self._vae_encoder.inputs[0].shape)[2:] |
|
|
| def _preprocess_mask(self, mask): |
| h, w = mask.shape |
| if h != self.init_image_shape[0] and w != self.init_image_shape[1]: |
| mask = cv2.resize( |
| mask, |
| (self.init_image_shape[1], self.init_image_shape[0]), |
| interpolation = cv2.INTER_NEAREST |
| ) |
| mask = cv2.resize( |
| mask, |
| (self.init_image_shape[1] // 8, self.init_image_shape[0] // 8), |
| interpolation = cv2.INTER_NEAREST |
| ) |
| mask = mask.astype(np.float32) / 255.0 |
| mask = np.tile(mask, (4, 1, 1)) |
| mask = mask[None].transpose(0, 1, 2, 3) |
| mask = 1 - mask |
| return mask |
|
|
| def _preprocess_image(self, image): |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
| h, w = image.shape[1:] |
| if h != self.init_image_shape[0] and w != self.init_image_shape[1]: |
| image = cv2.resize( |
| image, |
| (self.init_image_shape[1], self.init_image_shape[0]), |
| interpolation=cv2.INTER_LANCZOS4 |
| ) |
| |
| image = image.astype(np.float32) / 255.0 |
| image = 2.0 * image - 1.0 |
| |
| image = image[None].transpose(0, 3, 1, 2) |
| return image |
|
|
| def _encode_image(self, init_image): |
| moments = result(self.vae_encoder.infer_new_request({ |
| "init_image": self._preprocess_image(init_image) |
| })) |
| mean, logvar = np.split(moments, 2, axis=1) |
| std = np.exp(logvar * 0.5) |
| latent = (mean + std * np.random.randn(*mean.shape)) * 0.18215 |
| return latent |
|
|
| def __call__( |
| self, |
| prompt, |
| init_image = None, |
| mask = None, |
| strength = 0.5, |
| num_inference_steps = 32, |
| guidance_scale = 7.5, |
| eta = 0.0 |
| ): |
| |
| tokens = self.tokenizer( |
| prompt, |
| padding="max_length", |
| max_length=self.tokenizer.model_max_length, |
| truncation=True |
| ).input_ids |
| text_embeddings = result( |
| self.text_encoder.infer_new_request({"tokens": np.array([tokens])}) |
| ) |
|
|
| |
| if guidance_scale > 1.0: |
| tokens_uncond = self.tokenizer( |
| "", |
| padding="max_length", |
| max_length=self.tokenizer.model_max_length, |
| truncation=True |
| ).input_ids |
| uncond_embeddings = result( |
| self.text_encoder.infer_new_request({"tokens": np.array([tokens_uncond])}) |
| ) |
| text_embeddings = np.concatenate((uncond_embeddings, text_embeddings), axis=0) |
|
|
| |
| accepts_offset = "offset" in set(inspect.signature(self.scheduler.set_timesteps).parameters.keys()) |
| extra_set_kwargs = {} |
| offset = 0 |
| if accepts_offset: |
| offset = 1 |
| extra_set_kwargs["offset"] = 1 |
|
|
| self.scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs) |
|
|
| |
| if init_image is None: |
| latents = np.random.randn(*self.latent_shape) |
| init_timestep = num_inference_steps |
| else: |
| init_latents = self._encode_image(init_image) |
| init_timestep = int(num_inference_steps * strength) + offset |
| init_timestep = min(init_timestep, num_inference_steps) |
| timesteps = np.array([[self.scheduler.timesteps[-init_timestep]]]).astype(np.long) |
| noise = np.random.randn(*self.latent_shape) |
| latents = self.scheduler.add_noise(init_latents, noise, timesteps)[0] |
|
|
| if init_image is not None and mask is not None: |
| mask = self._preprocess_mask(mask) |
| else: |
| mask = None |
|
|
| |
| if isinstance(self.scheduler, LMSDiscreteScheduler): |
| latents = latents * self.scheduler.sigmas[0] |
|
|
| |
| |
| |
| |
| accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) |
| extra_step_kwargs = {} |
| if accepts_eta: |
| extra_step_kwargs["eta"] = eta |
|
|
| t_start = max(num_inference_steps - init_timestep + offset, 0) |
| for i, t in tqdm(enumerate(self.scheduler.timesteps[t_start:])): |
| |
| latent_model_input = np.stack([latents, latents], 0) if guidance_scale > 1.0 else latents[None] |
| if isinstance(self.scheduler, LMSDiscreteScheduler): |
| sigma = self.scheduler.sigmas[i] |
| latent_model_input = latent_model_input / ((sigma**2 + 1) ** 0.5) |
|
|
| |
| noise_pred = result(self.unet.infer_new_request({ |
| "latent_model_input": latent_model_input, |
| "t": t, |
| "encoder_hidden_states": text_embeddings |
| })) |
|
|
| |
| if guidance_scale > 1.0: |
| noise_pred = noise_pred[0] + guidance_scale * (noise_pred[1] - noise_pred[0]) |
|
|
| |
| if isinstance(self.scheduler, LMSDiscreteScheduler): |
| latents = self.scheduler.step(noise_pred, i, latents, **extra_step_kwargs)["prev_sample"] |
| else: |
| latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs)["prev_sample"] |
|
|
| |
| if mask is not None: |
| init_latents_proper = self.scheduler.add_noise(init_latents, noise, t) |
| latents = ((init_latents_proper * mask) + (latents * (1 - mask)))[0] |
|
|
| image = result(self.vae_decoder.infer_new_request({ |
| "latents": np.expand_dims(latents, 0) |
| })) |
|
|
| |
| image = (image / 2 + 0.5).clip(0, 1) |
| image = (image[0].transpose(1, 2, 0)[:, :, ::-1] * 255).astype(np.uint8) |
| return image |
|
|