| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import gc |
| import json |
| import os |
| import random |
| import shutil |
| import sys |
| import tempfile |
| import unittest |
| import unittest.mock as mock |
|
|
| import numpy as np |
| import PIL |
| import requests_mock |
| import safetensors.torch |
| import torch |
| from parameterized import parameterized |
| from PIL import Image |
| from requests.exceptions import HTTPError |
| from transformers import CLIPImageProcessor, CLIPModel, CLIPTextConfig, CLIPTextModel, CLIPTokenizer |
|
|
| from diffusers import ( |
| AutoencoderKL, |
| DDIMPipeline, |
| DDIMScheduler, |
| DDPMPipeline, |
| DDPMScheduler, |
| DiffusionPipeline, |
| DPMSolverMultistepScheduler, |
| EulerAncestralDiscreteScheduler, |
| EulerDiscreteScheduler, |
| LMSDiscreteScheduler, |
| PNDMScheduler, |
| StableDiffusionImg2ImgPipeline, |
| StableDiffusionInpaintPipelineLegacy, |
| StableDiffusionPipeline, |
| UNet2DConditionModel, |
| UNet2DModel, |
| UniPCMultistepScheduler, |
| logging, |
| ) |
| from diffusers.schedulers.scheduling_utils import SCHEDULER_CONFIG_NAME |
| from diffusers.utils import ( |
| CONFIG_NAME, |
| WEIGHTS_NAME, |
| floats_tensor, |
| is_flax_available, |
| nightly, |
| require_torch_2, |
| slow, |
| torch_device, |
| ) |
| from diffusers.utils.testing_utils import CaptureLogger, get_tests_dir, load_numpy, require_compel, require_torch_gpu |
|
|
|
|
| torch.backends.cuda.matmul.allow_tf32 = False |
|
|
|
|
| class DownloadTests(unittest.TestCase): |
| def test_one_request_upon_cached(self): |
| |
| if torch_device == "mps": |
| return |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| with requests_mock.mock(real_http=True) as m: |
| DiffusionPipeline.download( |
| "hf-internal-testing/tiny-stable-diffusion-pipe", safety_checker=None, cache_dir=tmpdirname |
| ) |
|
|
| download_requests = [r.method for r in m.request_history] |
| assert download_requests.count("HEAD") == 15, "15 calls to files" |
| assert download_requests.count("GET") == 17, "15 calls to files + model_info + model_index.json" |
| assert ( |
| len(download_requests) == 32 |
| ), "2 calls per file (15 files) + send_telemetry, model_info and model_index.json" |
|
|
| with requests_mock.mock(real_http=True) as m: |
| DiffusionPipeline.download( |
| "hf-internal-testing/tiny-stable-diffusion-pipe", safety_checker=None, cache_dir=tmpdirname |
| ) |
|
|
| cache_requests = [r.method for r in m.request_history] |
| assert cache_requests.count("HEAD") == 1, "model_index.json is only HEAD" |
| assert cache_requests.count("GET") == 1, "model info is only GET" |
| assert ( |
| len(cache_requests) == 2 |
| ), "We should call only `model_info` to check for _commit hash and `send_telemetry`" |
|
|
| def test_download_only_pytorch(self): |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| |
| tmpdirname = DiffusionPipeline.download( |
| "hf-internal-testing/tiny-stable-diffusion-pipe", safety_checker=None, cache_dir=tmpdirname |
| ) |
|
|
| all_root_files = [t[-1] for t in os.walk(os.path.join(tmpdirname))] |
| files = [item for sublist in all_root_files for item in sublist] |
|
|
| |
| |
| assert not any(f.endswith(".msgpack") for f in files) |
| |
| assert not any(f.endswith(".safetensors") for f in files) |
|
|
| def test_force_safetensors_error(self): |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| |
| with self.assertRaises(EnvironmentError): |
| tmpdirname = DiffusionPipeline.download( |
| "hf-internal-testing/tiny-stable-diffusion-pipe-no-safetensors", |
| safety_checker=None, |
| cache_dir=tmpdirname, |
| use_safetensors=True, |
| ) |
|
|
| def test_returned_cached_folder(self): |
| prompt = "hello" |
| pipe = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/tiny-stable-diffusion-torch", safety_checker=None |
| ) |
| _, local_path = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/tiny-stable-diffusion-torch", safety_checker=None, return_cached_folder=True |
| ) |
| pipe_2 = StableDiffusionPipeline.from_pretrained(local_path) |
|
|
| pipe = pipe.to(torch_device) |
| pipe_2 = pipe_2.to(torch_device) |
|
|
| generator = torch.manual_seed(0) |
| out = pipe(prompt, num_inference_steps=2, generator=generator, output_type="numpy").images |
|
|
| generator = torch.manual_seed(0) |
| out_2 = pipe_2(prompt, num_inference_steps=2, generator=generator, output_type="numpy").images |
|
|
| assert np.max(np.abs(out - out_2)) < 1e-3 |
|
|
| def test_download_safetensors(self): |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| |
| tmpdirname = DiffusionPipeline.download( |
| "hf-internal-testing/tiny-stable-diffusion-pipe-safetensors", |
| safety_checker=None, |
| cache_dir=tmpdirname, |
| ) |
|
|
| all_root_files = [t[-1] for t in os.walk(os.path.join(tmpdirname))] |
| files = [item for sublist in all_root_files for item in sublist] |
|
|
| |
| |
| assert not any(f.endswith(".bin") for f in files) |
|
|
| def test_download_no_safety_checker(self): |
| prompt = "hello" |
| pipe = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/tiny-stable-diffusion-torch", safety_checker=None |
| ) |
| pipe = pipe.to(torch_device) |
| generator = torch.manual_seed(0) |
| out = pipe(prompt, num_inference_steps=2, generator=generator, output_type="numpy").images |
|
|
| pipe_2 = StableDiffusionPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-torch") |
| pipe_2 = pipe_2.to(torch_device) |
| generator = torch.manual_seed(0) |
| out_2 = pipe_2(prompt, num_inference_steps=2, generator=generator, output_type="numpy").images |
|
|
| assert np.max(np.abs(out - out_2)) < 1e-3 |
|
|
| def test_load_no_safety_checker_explicit_locally(self): |
| prompt = "hello" |
| pipe = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/tiny-stable-diffusion-torch", safety_checker=None |
| ) |
| pipe = pipe.to(torch_device) |
| generator = torch.manual_seed(0) |
| out = pipe(prompt, num_inference_steps=2, generator=generator, output_type="numpy").images |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| pipe.save_pretrained(tmpdirname) |
| pipe_2 = StableDiffusionPipeline.from_pretrained(tmpdirname, safety_checker=None) |
| pipe_2 = pipe_2.to(torch_device) |
|
|
| generator = torch.manual_seed(0) |
|
|
| out_2 = pipe_2(prompt, num_inference_steps=2, generator=generator, output_type="numpy").images |
|
|
| assert np.max(np.abs(out - out_2)) < 1e-3 |
|
|
| def test_load_no_safety_checker_default_locally(self): |
| prompt = "hello" |
| pipe = StableDiffusionPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-torch") |
| pipe = pipe.to(torch_device) |
|
|
| generator = torch.manual_seed(0) |
| out = pipe(prompt, num_inference_steps=2, generator=generator, output_type="numpy").images |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| pipe.save_pretrained(tmpdirname) |
| pipe_2 = StableDiffusionPipeline.from_pretrained(tmpdirname) |
| pipe_2 = pipe_2.to(torch_device) |
|
|
| generator = torch.manual_seed(0) |
|
|
| out_2 = pipe_2(prompt, num_inference_steps=2, generator=generator, output_type="numpy").images |
|
|
| assert np.max(np.abs(out - out_2)) < 1e-3 |
|
|
| def test_cached_files_are_used_when_no_internet(self): |
| |
| response_mock = mock.Mock() |
| response_mock.status_code = 500 |
| response_mock.headers = {} |
| response_mock.raise_for_status.side_effect = HTTPError |
| response_mock.json.return_value = {} |
|
|
| |
| orig_pipe = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/tiny-stable-diffusion-torch", safety_checker=None |
| ) |
| orig_comps = {k: v for k, v in orig_pipe.components.items() if hasattr(v, "parameters")} |
|
|
| |
| with mock.patch("requests.request", return_value=response_mock): |
| |
| pipe = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/tiny-stable-diffusion-torch", safety_checker=None, local_files_only=True |
| ) |
| comps = {k: v for k, v in pipe.components.items() if hasattr(v, "parameters")} |
|
|
| for m1, m2 in zip(orig_comps.values(), comps.values()): |
| for p1, p2 in zip(m1.parameters(), m2.parameters()): |
| if p1.data.ne(p2.data).sum() > 0: |
| assert False, "Parameters not the same!" |
|
|
| def test_download_from_variant_folder(self): |
| for safe_avail in [False, True]: |
| import diffusers |
|
|
| diffusers.utils.import_utils._safetensors_available = safe_avail |
|
|
| other_format = ".bin" if safe_avail else ".safetensors" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| tmpdirname = StableDiffusionPipeline.download( |
| "hf-internal-testing/stable-diffusion-all-variants", cache_dir=tmpdirname |
| ) |
| all_root_files = [t[-1] for t in os.walk(tmpdirname)] |
| files = [item for sublist in all_root_files for item in sublist] |
|
|
| |
| |
| assert len(files) == 15, f"We should only download 15 files, not {len(files)}" |
| assert not any(f.endswith(other_format) for f in files) |
| |
| assert not any(len(f.split(".")) == 3 for f in files) |
|
|
| diffusers.utils.import_utils._safetensors_available = True |
|
|
| def test_download_variant_all(self): |
| for safe_avail in [False, True]: |
| import diffusers |
|
|
| diffusers.utils.import_utils._safetensors_available = safe_avail |
|
|
| other_format = ".bin" if safe_avail else ".safetensors" |
| this_format = ".safetensors" if safe_avail else ".bin" |
| variant = "fp16" |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| tmpdirname = StableDiffusionPipeline.download( |
| "hf-internal-testing/stable-diffusion-all-variants", cache_dir=tmpdirname, variant=variant |
| ) |
| all_root_files = [t[-1] for t in os.walk(tmpdirname)] |
| files = [item for sublist in all_root_files for item in sublist] |
|
|
| |
| |
| assert len(files) == 15, f"We should only download 15 files, not {len(files)}" |
| |
| assert len([f for f in files if f.endswith(f"{variant}{this_format}")]) == 4 |
| |
| assert not any(f.endswith(this_format) and not f.endswith(f"{variant}{this_format}") for f in files) |
| assert not any(f.endswith(other_format) for f in files) |
|
|
| diffusers.utils.import_utils._safetensors_available = True |
|
|
| def test_download_variant_partly(self): |
| for safe_avail in [False, True]: |
| import diffusers |
|
|
| diffusers.utils.import_utils._safetensors_available = safe_avail |
|
|
| other_format = ".bin" if safe_avail else ".safetensors" |
| this_format = ".safetensors" if safe_avail else ".bin" |
| variant = "no_ema" |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| tmpdirname = StableDiffusionPipeline.download( |
| "hf-internal-testing/stable-diffusion-all-variants", cache_dir=tmpdirname, variant=variant |
| ) |
| all_root_files = [t[-1] for t in os.walk(tmpdirname)] |
| files = [item for sublist in all_root_files for item in sublist] |
|
|
| unet_files = os.listdir(os.path.join(tmpdirname, "unet")) |
|
|
| |
| |
| assert len(files) == 15, f"We should only download 15 files, not {len(files)}" |
| |
| assert f"diffusion_pytorch_model.{variant}{this_format}" in unet_files |
| assert len([f for f in files if f.endswith(f"{variant}{this_format}")]) == 1 |
| |
| assert sum(f.endswith(this_format) and not f.endswith(f"{variant}{this_format}") for f in files) == 3 |
| assert not any(f.endswith(other_format) for f in files) |
|
|
| diffusers.utils.import_utils._safetensors_available = True |
|
|
| def test_download_broken_variant(self): |
| for safe_avail in [False, True]: |
| import diffusers |
|
|
| diffusers.utils.import_utils._safetensors_available = safe_avail |
| |
| for variant in [None, "no_ema"]: |
| with self.assertRaises(OSError) as error_context: |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| tmpdirname = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/stable-diffusion-broken-variants", |
| cache_dir=tmpdirname, |
| variant=variant, |
| ) |
|
|
| assert "Error no file name" in str(error_context.exception) |
|
|
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| tmpdirname = StableDiffusionPipeline.download( |
| "hf-internal-testing/stable-diffusion-broken-variants", cache_dir=tmpdirname, variant="fp16" |
| ) |
|
|
| all_root_files = [t[-1] for t in os.walk(tmpdirname)] |
| files = [item for sublist in all_root_files for item in sublist] |
|
|
| |
| |
| assert len(files) == 15, f"We should only download 15 files, not {len(files)}" |
| |
|
|
| diffusers.utils.import_utils._safetensors_available = True |
|
|
| def test_text_inversion_download(self): |
| pipe = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/tiny-stable-diffusion-torch", safety_checker=None |
| ) |
| pipe = pipe.to(torch_device) |
|
|
| num_tokens = len(pipe.tokenizer) |
|
|
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| ten = {"<*>": torch.ones((32,))} |
| torch.save(ten, os.path.join(tmpdirname, "learned_embeds.bin")) |
|
|
| pipe.load_textual_inversion(tmpdirname) |
|
|
| token = pipe.tokenizer.convert_tokens_to_ids("<*>") |
| assert token == num_tokens, "Added token must be at spot `num_tokens`" |
| assert pipe.text_encoder.get_input_embeddings().weight[-1].sum().item() == 32 |
| assert pipe._maybe_convert_prompt("<*>", pipe.tokenizer) == "<*>" |
|
|
| prompt = "hey <*>" |
| out = pipe(prompt, num_inference_steps=1, output_type="numpy").images |
| assert out.shape == (1, 128, 128, 3) |
|
|
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| ten = {"<**>": 2 * torch.ones((1, 32))} |
| torch.save(ten, os.path.join(tmpdirname, "learned_embeds.bin")) |
|
|
| pipe.load_textual_inversion(tmpdirname, weight_name="learned_embeds.bin") |
|
|
| token = pipe.tokenizer.convert_tokens_to_ids("<**>") |
| assert token == num_tokens + 1, "Added token must be at spot `num_tokens`" |
| assert pipe.text_encoder.get_input_embeddings().weight[-1].sum().item() == 64 |
| assert pipe._maybe_convert_prompt("<**>", pipe.tokenizer) == "<**>" |
|
|
| prompt = "hey <**>" |
| out = pipe(prompt, num_inference_steps=1, output_type="numpy").images |
| assert out.shape == (1, 128, 128, 3) |
|
|
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| ten = {"<***>": torch.cat([3 * torch.ones((1, 32)), 4 * torch.ones((1, 32)), 5 * torch.ones((1, 32))])} |
| torch.save(ten, os.path.join(tmpdirname, "learned_embeds.bin")) |
|
|
| pipe.load_textual_inversion(tmpdirname) |
|
|
| token = pipe.tokenizer.convert_tokens_to_ids("<***>") |
| token_1 = pipe.tokenizer.convert_tokens_to_ids("<***>_1") |
| token_2 = pipe.tokenizer.convert_tokens_to_ids("<***>_2") |
|
|
| assert token == num_tokens + 2, "Added token must be at spot `num_tokens`" |
| assert token_1 == num_tokens + 3, "Added token must be at spot `num_tokens`" |
| assert token_2 == num_tokens + 4, "Added token must be at spot `num_tokens`" |
| assert pipe.text_encoder.get_input_embeddings().weight[-3].sum().item() == 96 |
| assert pipe.text_encoder.get_input_embeddings().weight[-2].sum().item() == 128 |
| assert pipe.text_encoder.get_input_embeddings().weight[-1].sum().item() == 160 |
| assert pipe._maybe_convert_prompt("<***>", pipe.tokenizer) == "<***><***>_1<***>_2" |
|
|
| prompt = "hey <***>" |
| out = pipe(prompt, num_inference_steps=1, output_type="numpy").images |
| assert out.shape == (1, 128, 128, 3) |
|
|
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| ten = { |
| "string_to_param": { |
| "*": torch.cat([3 * torch.ones((1, 32)), 4 * torch.ones((1, 32)), 5 * torch.ones((1, 32))]) |
| }, |
| "name": "<****>", |
| } |
| torch.save(ten, os.path.join(tmpdirname, "a1111.bin")) |
|
|
| pipe.load_textual_inversion(tmpdirname, weight_name="a1111.bin") |
|
|
| token = pipe.tokenizer.convert_tokens_to_ids("<****>") |
| token_1 = pipe.tokenizer.convert_tokens_to_ids("<****>_1") |
| token_2 = pipe.tokenizer.convert_tokens_to_ids("<****>_2") |
|
|
| assert token == num_tokens + 5, "Added token must be at spot `num_tokens`" |
| assert token_1 == num_tokens + 6, "Added token must be at spot `num_tokens`" |
| assert token_2 == num_tokens + 7, "Added token must be at spot `num_tokens`" |
| assert pipe.text_encoder.get_input_embeddings().weight[-3].sum().item() == 96 |
| assert pipe.text_encoder.get_input_embeddings().weight[-2].sum().item() == 128 |
| assert pipe.text_encoder.get_input_embeddings().weight[-1].sum().item() == 160 |
| assert pipe._maybe_convert_prompt("<****>", pipe.tokenizer) == "<****><****>_1<****>_2" |
|
|
| prompt = "hey <****>" |
| out = pipe(prompt, num_inference_steps=1, output_type="numpy").images |
| assert out.shape == (1, 128, 128, 3) |
|
|
|
|
| class CustomPipelineTests(unittest.TestCase): |
| def test_load_custom_pipeline(self): |
| pipeline = DiffusionPipeline.from_pretrained( |
| "google/ddpm-cifar10-32", custom_pipeline="hf-internal-testing/diffusers-dummy-pipeline" |
| ) |
| pipeline = pipeline.to(torch_device) |
| |
| |
| assert pipeline.__class__.__name__ == "CustomPipeline" |
|
|
| def test_load_custom_github(self): |
| pipeline = DiffusionPipeline.from_pretrained( |
| "google/ddpm-cifar10-32", custom_pipeline="one_step_unet", custom_revision="main" |
| ) |
|
|
| |
| with torch.no_grad(): |
| output = pipeline() |
|
|
| assert output.numel() == output.sum() |
|
|
| |
| |
| del sys.modules["diffusers_modules.git.one_step_unet"] |
|
|
| pipeline = DiffusionPipeline.from_pretrained( |
| "google/ddpm-cifar10-32", custom_pipeline="one_step_unet", custom_revision="0.10.2" |
| ) |
| with torch.no_grad(): |
| output = pipeline() |
|
|
| assert output.numel() != output.sum() |
|
|
| assert pipeline.__class__.__name__ == "UnetSchedulerOneForwardPipeline" |
|
|
| def test_run_custom_pipeline(self): |
| pipeline = DiffusionPipeline.from_pretrained( |
| "google/ddpm-cifar10-32", custom_pipeline="hf-internal-testing/diffusers-dummy-pipeline" |
| ) |
| pipeline = pipeline.to(torch_device) |
| images, output_str = pipeline(num_inference_steps=2, output_type="np") |
|
|
| assert images[0].shape == (1, 32, 32, 3) |
|
|
| |
| assert output_str == "This is a test" |
|
|
| def test_local_custom_pipeline_repo(self): |
| local_custom_pipeline_path = get_tests_dir("fixtures/custom_pipeline") |
| pipeline = DiffusionPipeline.from_pretrained( |
| "google/ddpm-cifar10-32", custom_pipeline=local_custom_pipeline_path |
| ) |
| pipeline = pipeline.to(torch_device) |
| images, output_str = pipeline(num_inference_steps=2, output_type="np") |
|
|
| assert pipeline.__class__.__name__ == "CustomLocalPipeline" |
| assert images[0].shape == (1, 32, 32, 3) |
| |
| assert output_str == "This is a local test" |
|
|
| def test_local_custom_pipeline_file(self): |
| local_custom_pipeline_path = get_tests_dir("fixtures/custom_pipeline") |
| local_custom_pipeline_path = os.path.join(local_custom_pipeline_path, "what_ever.py") |
| pipeline = DiffusionPipeline.from_pretrained( |
| "google/ddpm-cifar10-32", custom_pipeline=local_custom_pipeline_path |
| ) |
| pipeline = pipeline.to(torch_device) |
| images, output_str = pipeline(num_inference_steps=2, output_type="np") |
|
|
| assert pipeline.__class__.__name__ == "CustomLocalPipeline" |
| assert images[0].shape == (1, 32, 32, 3) |
| |
| assert output_str == "This is a local test" |
|
|
| @slow |
| @require_torch_gpu |
| def test_download_from_git(self): |
| clip_model_id = "laion/CLIP-ViT-B-32-laion2B-s34B-b79K" |
|
|
| feature_extractor = CLIPImageProcessor.from_pretrained(clip_model_id) |
| clip_model = CLIPModel.from_pretrained(clip_model_id, torch_dtype=torch.float16) |
|
|
| pipeline = DiffusionPipeline.from_pretrained( |
| "CompVis/stable-diffusion-v1-4", |
| custom_pipeline="clip_guided_stable_diffusion", |
| clip_model=clip_model, |
| feature_extractor=feature_extractor, |
| torch_dtype=torch.float16, |
| ) |
| pipeline.enable_attention_slicing() |
| pipeline = pipeline.to(torch_device) |
|
|
| |
| |
| assert pipeline.__class__.__name__ == "CLIPGuidedStableDiffusion" |
|
|
| image = pipeline("a prompt", num_inference_steps=2, output_type="np").images[0] |
| assert image.shape == (512, 512, 3) |
|
|
|
|
| class PipelineFastTests(unittest.TestCase): |
| def tearDown(self): |
| |
| super().tearDown() |
| gc.collect() |
| torch.cuda.empty_cache() |
|
|
| import diffusers |
|
|
| diffusers.utils.import_utils._safetensors_available = True |
|
|
| def dummy_image(self): |
| batch_size = 1 |
| num_channels = 3 |
| sizes = (32, 32) |
|
|
| image = floats_tensor((batch_size, num_channels) + sizes, rng=random.Random(0)).to(torch_device) |
| return image |
|
|
| def dummy_uncond_unet(self, sample_size=32): |
| torch.manual_seed(0) |
| model = UNet2DModel( |
| block_out_channels=(32, 64), |
| layers_per_block=2, |
| sample_size=sample_size, |
| in_channels=3, |
| out_channels=3, |
| down_block_types=("DownBlock2D", "AttnDownBlock2D"), |
| up_block_types=("AttnUpBlock2D", "UpBlock2D"), |
| ) |
| return model |
|
|
| def dummy_cond_unet(self, sample_size=32): |
| torch.manual_seed(0) |
| model = UNet2DConditionModel( |
| block_out_channels=(32, 64), |
| layers_per_block=2, |
| sample_size=sample_size, |
| in_channels=4, |
| out_channels=4, |
| down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"), |
| up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"), |
| cross_attention_dim=32, |
| ) |
| return model |
|
|
| @property |
| def dummy_vae(self): |
| torch.manual_seed(0) |
| model = AutoencoderKL( |
| block_out_channels=[32, 64], |
| in_channels=3, |
| out_channels=3, |
| down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"], |
| up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"], |
| latent_channels=4, |
| ) |
| return model |
|
|
| @property |
| def dummy_text_encoder(self): |
| torch.manual_seed(0) |
| config = CLIPTextConfig( |
| bos_token_id=0, |
| eos_token_id=2, |
| hidden_size=32, |
| intermediate_size=37, |
| layer_norm_eps=1e-05, |
| num_attention_heads=4, |
| num_hidden_layers=5, |
| pad_token_id=1, |
| vocab_size=1000, |
| ) |
| return CLIPTextModel(config) |
|
|
| @property |
| def dummy_extractor(self): |
| def extract(*args, **kwargs): |
| class Out: |
| def __init__(self): |
| self.pixel_values = torch.ones([0]) |
|
|
| def to(self, device): |
| self.pixel_values.to(device) |
| return self |
|
|
| return Out() |
|
|
| return extract |
|
|
| @parameterized.expand( |
| [ |
| [DDIMScheduler, DDIMPipeline, 32], |
| [DDPMScheduler, DDPMPipeline, 32], |
| [DDIMScheduler, DDIMPipeline, (32, 64)], |
| [DDPMScheduler, DDPMPipeline, (64, 32)], |
| ] |
| ) |
| def test_uncond_unet_components(self, scheduler_fn=DDPMScheduler, pipeline_fn=DDPMPipeline, sample_size=32): |
| unet = self.dummy_uncond_unet(sample_size) |
| scheduler = scheduler_fn() |
| pipeline = pipeline_fn(unet, scheduler).to(torch_device) |
|
|
| generator = torch.manual_seed(0) |
| out_image = pipeline( |
| generator=generator, |
| num_inference_steps=2, |
| output_type="np", |
| ).images |
| sample_size = (sample_size, sample_size) if isinstance(sample_size, int) else sample_size |
| assert out_image.shape == (1, *sample_size, 3) |
|
|
| def test_stable_diffusion_components(self): |
| """Test that components property works correctly""" |
| unet = self.dummy_cond_unet() |
| scheduler = PNDMScheduler(skip_prk_steps=True) |
| vae = self.dummy_vae |
| bert = self.dummy_text_encoder |
| tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") |
|
|
| image = self.dummy_image().cpu().permute(0, 2, 3, 1)[0] |
| init_image = Image.fromarray(np.uint8(image)).convert("RGB") |
| mask_image = Image.fromarray(np.uint8(image + 4)).convert("RGB").resize((32, 32)) |
|
|
| |
| inpaint = StableDiffusionInpaintPipelineLegacy( |
| unet=unet, |
| scheduler=scheduler, |
| vae=vae, |
| text_encoder=bert, |
| tokenizer=tokenizer, |
| safety_checker=None, |
| feature_extractor=self.dummy_extractor, |
| ).to(torch_device) |
| img2img = StableDiffusionImg2ImgPipeline(**inpaint.components).to(torch_device) |
| text2img = StableDiffusionPipeline(**inpaint.components).to(torch_device) |
|
|
| prompt = "A painting of a squirrel eating a burger" |
|
|
| generator = torch.manual_seed(0) |
| image_inpaint = inpaint( |
| [prompt], |
| generator=generator, |
| num_inference_steps=2, |
| output_type="np", |
| image=init_image, |
| mask_image=mask_image, |
| ).images |
| image_img2img = img2img( |
| [prompt], |
| generator=generator, |
| num_inference_steps=2, |
| output_type="np", |
| image=init_image, |
| ).images |
| image_text2img = text2img( |
| [prompt], |
| generator=generator, |
| num_inference_steps=2, |
| output_type="np", |
| ).images |
|
|
| assert image_inpaint.shape == (1, 32, 32, 3) |
| assert image_img2img.shape == (1, 32, 32, 3) |
| assert image_text2img.shape == (1, 64, 64, 3) |
|
|
| @require_torch_gpu |
| def test_pipe_false_offload_warn(self): |
| unet = self.dummy_cond_unet() |
| scheduler = PNDMScheduler(skip_prk_steps=True) |
| vae = self.dummy_vae |
| bert = self.dummy_text_encoder |
| tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") |
|
|
| sd = StableDiffusionPipeline( |
| unet=unet, |
| scheduler=scheduler, |
| vae=vae, |
| text_encoder=bert, |
| tokenizer=tokenizer, |
| safety_checker=None, |
| feature_extractor=self.dummy_extractor, |
| ) |
|
|
| sd.enable_model_cpu_offload() |
|
|
| logger = logging.get_logger("diffusers.pipelines.pipeline_utils") |
| with CaptureLogger(logger) as cap_logger: |
| sd.to("cuda") |
|
|
| assert "It is strongly recommended against doing so" in str(cap_logger) |
|
|
| sd = StableDiffusionPipeline( |
| unet=unet, |
| scheduler=scheduler, |
| vae=vae, |
| text_encoder=bert, |
| tokenizer=tokenizer, |
| safety_checker=None, |
| feature_extractor=self.dummy_extractor, |
| ) |
|
|
| def test_set_scheduler(self): |
| unet = self.dummy_cond_unet() |
| scheduler = PNDMScheduler(skip_prk_steps=True) |
| vae = self.dummy_vae |
| bert = self.dummy_text_encoder |
| tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") |
|
|
| sd = StableDiffusionPipeline( |
| unet=unet, |
| scheduler=scheduler, |
| vae=vae, |
| text_encoder=bert, |
| tokenizer=tokenizer, |
| safety_checker=None, |
| feature_extractor=self.dummy_extractor, |
| ) |
|
|
| sd.scheduler = DDIMScheduler.from_config(sd.scheduler.config) |
| assert isinstance(sd.scheduler, DDIMScheduler) |
| sd.scheduler = DDPMScheduler.from_config(sd.scheduler.config) |
| assert isinstance(sd.scheduler, DDPMScheduler) |
| sd.scheduler = PNDMScheduler.from_config(sd.scheduler.config) |
| assert isinstance(sd.scheduler, PNDMScheduler) |
| sd.scheduler = LMSDiscreteScheduler.from_config(sd.scheduler.config) |
| assert isinstance(sd.scheduler, LMSDiscreteScheduler) |
| sd.scheduler = EulerDiscreteScheduler.from_config(sd.scheduler.config) |
| assert isinstance(sd.scheduler, EulerDiscreteScheduler) |
| sd.scheduler = EulerAncestralDiscreteScheduler.from_config(sd.scheduler.config) |
| assert isinstance(sd.scheduler, EulerAncestralDiscreteScheduler) |
| sd.scheduler = DPMSolverMultistepScheduler.from_config(sd.scheduler.config) |
| assert isinstance(sd.scheduler, DPMSolverMultistepScheduler) |
|
|
| def test_set_scheduler_consistency(self): |
| unet = self.dummy_cond_unet() |
| pndm = PNDMScheduler.from_config("hf-internal-testing/tiny-stable-diffusion-torch", subfolder="scheduler") |
| ddim = DDIMScheduler.from_config("hf-internal-testing/tiny-stable-diffusion-torch", subfolder="scheduler") |
| vae = self.dummy_vae |
| bert = self.dummy_text_encoder |
| tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") |
|
|
| sd = StableDiffusionPipeline( |
| unet=unet, |
| scheduler=pndm, |
| vae=vae, |
| text_encoder=bert, |
| tokenizer=tokenizer, |
| safety_checker=None, |
| feature_extractor=self.dummy_extractor, |
| ) |
|
|
| pndm_config = sd.scheduler.config |
| sd.scheduler = DDPMScheduler.from_config(pndm_config) |
| sd.scheduler = PNDMScheduler.from_config(sd.scheduler.config) |
| pndm_config_2 = sd.scheduler.config |
| pndm_config_2 = {k: v for k, v in pndm_config_2.items() if k in pndm_config} |
|
|
| assert dict(pndm_config) == dict(pndm_config_2) |
|
|
| sd = StableDiffusionPipeline( |
| unet=unet, |
| scheduler=ddim, |
| vae=vae, |
| text_encoder=bert, |
| tokenizer=tokenizer, |
| safety_checker=None, |
| feature_extractor=self.dummy_extractor, |
| ) |
|
|
| ddim_config = sd.scheduler.config |
| sd.scheduler = LMSDiscreteScheduler.from_config(ddim_config) |
| sd.scheduler = DDIMScheduler.from_config(sd.scheduler.config) |
| ddim_config_2 = sd.scheduler.config |
| ddim_config_2 = {k: v for k, v in ddim_config_2.items() if k in ddim_config} |
|
|
| assert dict(ddim_config) == dict(ddim_config_2) |
|
|
| def test_save_safe_serialization(self): |
| pipeline = StableDiffusionPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-torch") |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| pipeline.save_pretrained(tmpdirname, safe_serialization=True) |
|
|
| |
| vae_path = os.path.join(tmpdirname, "vae", "diffusion_pytorch_model.safetensors") |
| assert os.path.exists(vae_path), f"Could not find {vae_path}" |
| _ = safetensors.torch.load_file(vae_path) |
|
|
| |
| unet_path = os.path.join(tmpdirname, "unet", "diffusion_pytorch_model.safetensors") |
| assert os.path.exists(unet_path), f"Could not find {unet_path}" |
| _ = safetensors.torch.load_file(unet_path) |
|
|
| |
| text_encoder_path = os.path.join(tmpdirname, "text_encoder", "model.safetensors") |
| assert os.path.exists(text_encoder_path), f"Could not find {text_encoder_path}" |
| _ = safetensors.torch.load_file(text_encoder_path) |
|
|
| pipeline = StableDiffusionPipeline.from_pretrained(tmpdirname) |
| assert pipeline.unet is not None |
| assert pipeline.vae is not None |
| assert pipeline.text_encoder is not None |
| assert pipeline.scheduler is not None |
| assert pipeline.feature_extractor is not None |
|
|
| def test_no_pytorch_download_when_doing_safetensors(self): |
| |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| _ = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/diffusers-stable-diffusion-tiny-all", cache_dir=tmpdirname |
| ) |
|
|
| path = os.path.join( |
| tmpdirname, |
| "models--hf-internal-testing--diffusers-stable-diffusion-tiny-all", |
| "snapshots", |
| "07838d72e12f9bcec1375b0482b80c1d399be843", |
| "unet", |
| ) |
| |
| assert os.path.exists(os.path.join(path, "diffusion_pytorch_model.safetensors")) |
| |
| assert not os.path.exists(os.path.join(path, "diffusion_pytorch_model.bin")) |
|
|
| def test_no_safetensors_download_when_doing_pytorch(self): |
| |
| import diffusers |
|
|
| diffusers.utils.import_utils._safetensors_available = False |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| _ = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/diffusers-stable-diffusion-tiny-all", cache_dir=tmpdirname |
| ) |
|
|
| path = os.path.join( |
| tmpdirname, |
| "models--hf-internal-testing--diffusers-stable-diffusion-tiny-all", |
| "snapshots", |
| "07838d72e12f9bcec1375b0482b80c1d399be843", |
| "unet", |
| ) |
| |
| assert not os.path.exists(os.path.join(path, "diffusion_pytorch_model.safetensors")) |
| |
| assert os.path.exists(os.path.join(path, "diffusion_pytorch_model.bin")) |
|
|
| diffusers.utils.import_utils._safetensors_available = True |
|
|
| def test_optional_components(self): |
| unet = self.dummy_cond_unet() |
| pndm = PNDMScheduler.from_config("hf-internal-testing/tiny-stable-diffusion-torch", subfolder="scheduler") |
| vae = self.dummy_vae |
| bert = self.dummy_text_encoder |
| tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") |
|
|
| orig_sd = StableDiffusionPipeline( |
| unet=unet, |
| scheduler=pndm, |
| vae=vae, |
| text_encoder=bert, |
| tokenizer=tokenizer, |
| safety_checker=unet, |
| feature_extractor=self.dummy_extractor, |
| ) |
| sd = orig_sd |
|
|
| assert sd.config.requires_safety_checker is True |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| sd.save_pretrained(tmpdirname) |
|
|
| |
| sd = StableDiffusionPipeline.from_pretrained( |
| tmpdirname, feature_extractor=None, safety_checker=None, requires_safety_checker=False |
| ) |
|
|
| assert sd.config.requires_safety_checker is False |
| assert sd.config.safety_checker == (None, None) |
| assert sd.config.feature_extractor == (None, None) |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| sd.save_pretrained(tmpdirname) |
|
|
| |
| sd = StableDiffusionPipeline.from_pretrained(tmpdirname) |
|
|
| assert sd.config.requires_safety_checker is False |
| assert sd.config.safety_checker == (None, None) |
| assert sd.config.feature_extractor == (None, None) |
|
|
| orig_sd.save_pretrained(tmpdirname) |
|
|
| |
| shutil.rmtree(os.path.join(tmpdirname, "safety_checker")) |
| with open(os.path.join(tmpdirname, sd.config_name)) as f: |
| config = json.load(f) |
| config["safety_checker"] = [None, None] |
| with open(os.path.join(tmpdirname, sd.config_name), "w") as f: |
| json.dump(config, f) |
|
|
| sd = StableDiffusionPipeline.from_pretrained(tmpdirname, requires_safety_checker=False) |
| sd.save_pretrained(tmpdirname) |
| sd = StableDiffusionPipeline.from_pretrained(tmpdirname) |
|
|
| assert sd.config.requires_safety_checker is False |
| assert sd.config.safety_checker == (None, None) |
| assert sd.config.feature_extractor == (None, None) |
|
|
| |
| with open(os.path.join(tmpdirname, sd.config_name)) as f: |
| config = json.load(f) |
| del config["safety_checker"] |
| del config["feature_extractor"] |
| with open(os.path.join(tmpdirname, sd.config_name), "w") as f: |
| json.dump(config, f) |
|
|
| sd = StableDiffusionPipeline.from_pretrained(tmpdirname) |
|
|
| assert sd.config.requires_safety_checker is False |
| assert sd.config.safety_checker == (None, None) |
| assert sd.config.feature_extractor == (None, None) |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| sd.save_pretrained(tmpdirname) |
|
|
| |
| sd = StableDiffusionPipeline.from_pretrained(tmpdirname, feature_extractor=self.dummy_extractor) |
|
|
| assert sd.config.requires_safety_checker is False |
| assert sd.config.safety_checker == (None, None) |
| assert sd.config.feature_extractor != (None, None) |
|
|
| |
| sd = StableDiffusionPipeline.from_pretrained( |
| tmpdirname, |
| feature_extractor=self.dummy_extractor, |
| safety_checker=unet, |
| requires_safety_checker=[True, True], |
| ) |
|
|
| assert sd.config.requires_safety_checker == [True, True] |
| assert sd.config.safety_checker != (None, None) |
| assert sd.config.feature_extractor != (None, None) |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| sd.save_pretrained(tmpdirname) |
| sd = StableDiffusionPipeline.from_pretrained(tmpdirname, feature_extractor=self.dummy_extractor) |
|
|
| assert sd.config.requires_safety_checker == [True, True] |
| assert sd.config.safety_checker != (None, None) |
| assert sd.config.feature_extractor != (None, None) |
|
|
|
|
| @slow |
| @require_torch_gpu |
| class PipelineSlowTests(unittest.TestCase): |
| def tearDown(self): |
| |
| super().tearDown() |
| gc.collect() |
| torch.cuda.empty_cache() |
|
|
| def test_smart_download(self): |
| model_id = "hf-internal-testing/unet-pipeline-dummy" |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| _ = DiffusionPipeline.from_pretrained(model_id, cache_dir=tmpdirname, force_download=True) |
| local_repo_name = "--".join(["models"] + model_id.split("/")) |
| snapshot_dir = os.path.join(tmpdirname, local_repo_name, "snapshots") |
| snapshot_dir = os.path.join(snapshot_dir, os.listdir(snapshot_dir)[0]) |
|
|
| |
| assert os.path.isfile(os.path.join(snapshot_dir, DiffusionPipeline.config_name)) |
| assert os.path.isfile(os.path.join(snapshot_dir, CONFIG_NAME)) |
| assert os.path.isfile(os.path.join(snapshot_dir, SCHEDULER_CONFIG_NAME)) |
| assert os.path.isfile(os.path.join(snapshot_dir, WEIGHTS_NAME)) |
| assert os.path.isfile(os.path.join(snapshot_dir, "scheduler", SCHEDULER_CONFIG_NAME)) |
| assert os.path.isfile(os.path.join(snapshot_dir, "unet", WEIGHTS_NAME)) |
| assert os.path.isfile(os.path.join(snapshot_dir, "unet", WEIGHTS_NAME)) |
| |
| |
| |
| assert not os.path.isfile(os.path.join(snapshot_dir, "big_array.npy")) |
|
|
| def test_warning_unused_kwargs(self): |
| model_id = "hf-internal-testing/unet-pipeline-dummy" |
| logger = logging.get_logger("diffusers.pipelines") |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| with CaptureLogger(logger) as cap_logger: |
| DiffusionPipeline.from_pretrained( |
| model_id, |
| not_used=True, |
| cache_dir=tmpdirname, |
| force_download=True, |
| ) |
|
|
| assert ( |
| cap_logger.out.strip().split("\n")[-1] |
| == "Keyword arguments {'not_used': True} are not expected by DDPMPipeline and will be ignored." |
| ) |
|
|
| def test_from_save_pretrained(self): |
| |
| model = UNet2DModel( |
| block_out_channels=(32, 64), |
| layers_per_block=2, |
| sample_size=32, |
| in_channels=3, |
| out_channels=3, |
| down_block_types=("DownBlock2D", "AttnDownBlock2D"), |
| up_block_types=("AttnUpBlock2D", "UpBlock2D"), |
| ) |
| scheduler = DDPMScheduler(num_train_timesteps=10) |
|
|
| ddpm = DDPMPipeline(model, scheduler) |
| ddpm.to(torch_device) |
| ddpm.set_progress_bar_config(disable=None) |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| ddpm.save_pretrained(tmpdirname) |
| new_ddpm = DDPMPipeline.from_pretrained(tmpdirname) |
| new_ddpm.to(torch_device) |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(0) |
| image = ddpm(generator=generator, num_inference_steps=5, output_type="numpy").images |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(0) |
| new_image = new_ddpm(generator=generator, num_inference_steps=5, output_type="numpy").images |
|
|
| assert np.abs(image - new_image).sum() < 1e-5, "Models don't give the same forward pass" |
|
|
| @require_torch_2 |
| def test_from_save_pretrained_dynamo(self): |
| |
| model = UNet2DModel( |
| block_out_channels=(32, 64), |
| layers_per_block=2, |
| sample_size=32, |
| in_channels=3, |
| out_channels=3, |
| down_block_types=("DownBlock2D", "AttnDownBlock2D"), |
| up_block_types=("AttnUpBlock2D", "UpBlock2D"), |
| ) |
| model = torch.compile(model) |
| scheduler = DDPMScheduler(num_train_timesteps=10) |
|
|
| ddpm = DDPMPipeline(model, scheduler) |
| ddpm.to(torch_device) |
| ddpm.set_progress_bar_config(disable=None) |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| ddpm.save_pretrained(tmpdirname) |
| new_ddpm = DDPMPipeline.from_pretrained(tmpdirname) |
| new_ddpm.to(torch_device) |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(0) |
| image = ddpm(generator=generator, num_inference_steps=5, output_type="numpy").images |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(0) |
| new_image = new_ddpm(generator=generator, num_inference_steps=5, output_type="numpy").images |
|
|
| assert np.abs(image - new_image).sum() < 1e-5, "Models don't give the same forward pass" |
|
|
| def test_from_pretrained_hub(self): |
| model_path = "google/ddpm-cifar10-32" |
|
|
| scheduler = DDPMScheduler(num_train_timesteps=10) |
|
|
| ddpm = DDPMPipeline.from_pretrained(model_path, scheduler=scheduler) |
| ddpm = ddpm.to(torch_device) |
| ddpm.set_progress_bar_config(disable=None) |
|
|
| ddpm_from_hub = DiffusionPipeline.from_pretrained(model_path, scheduler=scheduler) |
| ddpm_from_hub = ddpm_from_hub.to(torch_device) |
| ddpm_from_hub.set_progress_bar_config(disable=None) |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(0) |
| image = ddpm(generator=generator, num_inference_steps=5, output_type="numpy").images |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(0) |
| new_image = ddpm_from_hub(generator=generator, num_inference_steps=5, output_type="numpy").images |
|
|
| assert np.abs(image - new_image).sum() < 1e-5, "Models don't give the same forward pass" |
|
|
| def test_from_pretrained_hub_pass_model(self): |
| model_path = "google/ddpm-cifar10-32" |
|
|
| scheduler = DDPMScheduler(num_train_timesteps=10) |
|
|
| |
| unet = UNet2DModel.from_pretrained(model_path) |
| ddpm_from_hub_custom_model = DiffusionPipeline.from_pretrained(model_path, unet=unet, scheduler=scheduler) |
| ddpm_from_hub_custom_model = ddpm_from_hub_custom_model.to(torch_device) |
| ddpm_from_hub_custom_model.set_progress_bar_config(disable=None) |
|
|
| ddpm_from_hub = DiffusionPipeline.from_pretrained(model_path, scheduler=scheduler) |
| ddpm_from_hub = ddpm_from_hub.to(torch_device) |
| ddpm_from_hub_custom_model.set_progress_bar_config(disable=None) |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(0) |
| image = ddpm_from_hub_custom_model(generator=generator, num_inference_steps=5, output_type="numpy").images |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(0) |
| new_image = ddpm_from_hub(generator=generator, num_inference_steps=5, output_type="numpy").images |
|
|
| assert np.abs(image - new_image).sum() < 1e-5, "Models don't give the same forward pass" |
|
|
| def test_output_format(self): |
| model_path = "google/ddpm-cifar10-32" |
|
|
| scheduler = DDIMScheduler.from_pretrained(model_path) |
| pipe = DDIMPipeline.from_pretrained(model_path, scheduler=scheduler) |
| pipe.to(torch_device) |
| pipe.set_progress_bar_config(disable=None) |
|
|
| images = pipe(output_type="numpy").images |
| assert images.shape == (1, 32, 32, 3) |
| assert isinstance(images, np.ndarray) |
|
|
| images = pipe(output_type="pil", num_inference_steps=4).images |
| assert isinstance(images, list) |
| assert len(images) == 1 |
| assert isinstance(images[0], PIL.Image.Image) |
|
|
| |
| images = pipe(num_inference_steps=4).images |
| assert isinstance(images, list) |
| assert isinstance(images[0], PIL.Image.Image) |
|
|
| def test_from_flax_from_pt(self): |
| pipe_pt = StableDiffusionPipeline.from_pretrained( |
| "hf-internal-testing/tiny-stable-diffusion-torch", safety_checker=None |
| ) |
| pipe_pt.to(torch_device) |
|
|
| if not is_flax_available(): |
| raise ImportError("Make sure flax is installed.") |
|
|
| from diffusers import FlaxStableDiffusionPipeline |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| pipe_pt.save_pretrained(tmpdirname) |
|
|
| pipe_flax, params = FlaxStableDiffusionPipeline.from_pretrained( |
| tmpdirname, safety_checker=None, from_pt=True |
| ) |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| pipe_flax.save_pretrained(tmpdirname, params=params) |
| pipe_pt_2 = StableDiffusionPipeline.from_pretrained(tmpdirname, safety_checker=None, from_flax=True) |
| pipe_pt_2.to(torch_device) |
|
|
| prompt = "Hello" |
|
|
| generator = torch.manual_seed(0) |
| image_0 = pipe_pt( |
| [prompt], |
| generator=generator, |
| num_inference_steps=2, |
| output_type="np", |
| ).images[0] |
|
|
| generator = torch.manual_seed(0) |
| image_1 = pipe_pt_2( |
| [prompt], |
| generator=generator, |
| num_inference_steps=2, |
| output_type="np", |
| ).images[0] |
|
|
| assert np.abs(image_0 - image_1).sum() < 1e-5, "Models don't give the same forward pass" |
|
|
| @require_compel |
| def test_weighted_prompts_compel(self): |
| from compel import Compel |
|
|
| pipe = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4") |
| pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config) |
| pipe.enable_model_cpu_offload() |
| pipe.enable_attention_slicing() |
|
|
| compel = Compel(tokenizer=pipe.tokenizer, text_encoder=pipe.text_encoder) |
|
|
| prompt = "a red cat playing with a ball{}" |
|
|
| prompts = [prompt.format(s) for s in ["", "++", "--"]] |
|
|
| prompt_embeds = compel(prompts) |
|
|
| generator = [torch.Generator(device="cpu").manual_seed(33) for _ in range(prompt_embeds.shape[0])] |
|
|
| images = pipe( |
| prompt_embeds=prompt_embeds, generator=generator, num_inference_steps=20, output_type="numpy" |
| ).images |
|
|
| for i, image in enumerate(images): |
| expected_image = load_numpy( |
| "https://huggingface.co/datasets/hf-internal-testing/diffusers-images/resolve/main" |
| f"/compel/forest_{i}.npy" |
| ) |
|
|
| assert np.abs(image - expected_image).max() < 1e-2 |
|
|
|
|
| @nightly |
| @require_torch_gpu |
| class PipelineNightlyTests(unittest.TestCase): |
| def tearDown(self): |
| |
| super().tearDown() |
| gc.collect() |
| torch.cuda.empty_cache() |
|
|
| def test_ddpm_ddim_equality_batched(self): |
| seed = 0 |
| model_id = "google/ddpm-cifar10-32" |
|
|
| unet = UNet2DModel.from_pretrained(model_id) |
| ddpm_scheduler = DDPMScheduler() |
| ddim_scheduler = DDIMScheduler() |
|
|
| ddpm = DDPMPipeline(unet=unet, scheduler=ddpm_scheduler) |
| ddpm.to(torch_device) |
| ddpm.set_progress_bar_config(disable=None) |
|
|
| ddim = DDIMPipeline(unet=unet, scheduler=ddim_scheduler) |
| ddim.to(torch_device) |
| ddim.set_progress_bar_config(disable=None) |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(seed) |
| ddpm_images = ddpm(batch_size=2, generator=generator, output_type="numpy").images |
|
|
| generator = torch.Generator(device=torch_device).manual_seed(seed) |
| ddim_images = ddim( |
| batch_size=2, |
| generator=generator, |
| num_inference_steps=1000, |
| eta=1.0, |
| output_type="numpy", |
| use_clipped_model_output=True, |
| ).images |
|
|
| |
| assert np.abs(ddpm_images - ddim_images).max() < 1e-1 |
|
|