| import gradio as gr |
| import spaces |
| import numpy as np |
| import random |
| import torch |
| import functools |
| from pathlib import Path |
| from PIL import Image |
| from omegaconf import OmegaConf |
| from tim.schedulers.transition import TransitionSchedule |
| from tim.utils.misc_utils import instantiate_from_config, init_from_ckpt |
| from tim.models.vae import get_sd_vae, get_dc_ae, sd_vae_decode, dc_ae_decode |
| from tim.models.utils.text_encoders import load_text_encoder, encode_prompt |
| |
|
|
| |
| dtype = torch.bfloat16 |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| MAX_SEED = np.iinfo(np.int32).max |
| MAX_IMAGE_SIZE = 2048 |
|
|
| |
| model = None |
| scheduler = None |
| decode_func = None |
| config = None |
| text_encoder = None |
| tokenizer = None |
|
|
|
|
| def load_model_components(device: str = "cuda"): |
| """Load all model components once at startup""" |
| global model, scheduler, decode_func, config, text_encoder, tokenizer |
|
|
| try: |
| |
| config_path = "configs/t2i/tim_xl_p1_t2i.yaml" |
| from huggingface_hub import hf_hub_download |
|
|
| ckpt_path = hf_hub_download( |
| repo_id="blanchon/TiM-checkpoints", filename="t2i_model.bin" |
| ) |
|
|
| if not Path(config_path).exists(): |
| raise FileNotFoundError(f"Config file not found: {config_path}") |
| if not Path(ckpt_path).exists(): |
| raise FileNotFoundError(f"Checkpoint file not found: {ckpt_path}") |
|
|
| print("Loading configuration...") |
| config = OmegaConf.load(config_path) |
| model_config = config.model |
|
|
| print("Loading VAE...") |
| |
| if "dc-ae" in model_config.vae_dir: |
| dc_ae = get_dc_ae(model_config.vae_dir, dtype=torch.float32, device=device) |
| dc_ae.enable_tiling(2560, 2560, 2560, 2560) |
| decode_func = functools.partial(dc_ae_decode, dc_ae, slice_vae=True) |
| elif "sd-vae" in model_config.vae_dir: |
| sd_vae = get_sd_vae( |
| model_config.vae_dir, dtype=torch.float32, device=device |
| ) |
| decode_func = functools.partial(sd_vae_decode, sd_vae, slice_vae=True) |
| else: |
| raise ValueError("Unsupported VAE type") |
|
|
| |
| text_encoder, tokenizer = load_text_encoder( |
| text_encoder_dir=config.model.text_encoder_dir, |
| device=device, |
| weight_dtype=dtype, |
| ) |
|
|
| print("Loading main model...") |
| |
| model = instantiate_from_config(model_config.network).to( |
| device=device, dtype=dtype |
| ) |
| init_from_ckpt(model, checkpoint_dir=ckpt_path, ignore_keys=None, verbose=True) |
| model.eval() |
|
|
| print("Loading scheduler...") |
| |
| transport = instantiate_from_config(model_config.transport) |
| scheduler = TransitionSchedule( |
| transport=transport, **OmegaConf.to_container(model_config.transition_loss) |
| ) |
|
|
| print("All components loaded successfully!") |
|
|
| except Exception as e: |
| print(f"Error loading model components: {e}") |
| raise e |
|
|
|
|
| @spaces.GPU(duration=60) |
| def generate_image( |
| prompt, |
| seed=42, |
| randomize_seed=False, |
| width=1024, |
| height=1024, |
| guidance_scale=2.5, |
| num_inference_steps=16, |
| progress=gr.Progress(track_tqdm=True), |
| ): |
| """Generate image from text prompt""" |
| try: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| print(f"Using device: {device}") |
| |
| if not prompt or len(prompt.strip()) == 0: |
| raise ValueError("Please enter a valid prompt") |
|
|
| if model is None or scheduler is None: |
| raise RuntimeError("Model components not loaded. Please check the setup.") |
|
|
| |
| if ( |
| width < 256 |
| or width > MAX_IMAGE_SIZE |
| or height < 256 |
| or height > MAX_IMAGE_SIZE |
| ): |
| raise ValueError( |
| f"Image dimensions must be between 256 and {MAX_IMAGE_SIZE}" |
| ) |
|
|
| if width % 32 != 0 or height % 32 != 0: |
| raise ValueError("Image dimensions must be divisible by 32") |
|
|
| if randomize_seed: |
| seed = random.randint(0, MAX_SEED) |
|
|
| generator = torch.Generator(device=device).manual_seed(seed) |
|
|
| |
| spatial_downsample = 32 if "dc-ae" in config.model.vae_dir else 8 |
| latent_h = int(height / spatial_downsample) |
| latent_w = int(width / spatial_downsample) |
|
|
| progress(0.1, desc="Generating random latent...") |
|
|
| |
| z = torch.randn( |
| (1, model.in_channels, latent_h, latent_w), |
| device=device, |
| dtype=dtype, |
| generator=generator, |
| ) |
|
|
| progress(0.1, desc="Loading text encoder...") |
|
|
| |
| text_encoder.set_attn_implementation("flash_attention_2") |
| text_encoder.to(device) |
|
|
| |
| cap_features, cap_mask = encode_prompt( |
| tokenizer, |
| text_encoder.model, |
| device, |
| dtype, |
| [prompt], |
| config.model.use_last_hidden_state, |
| max_seq_length=config.model.max_seq_length, |
| ) |
|
|
| |
| null_cap_feat, null_cap_mask = encode_prompt( |
| tokenizer, |
| text_encoder.model, |
| device, |
| dtype, |
| [""], |
| config.model.use_last_hidden_state, |
| max_seq_length=config.model.max_seq_length, |
| ) |
|
|
| cur_max_seq_len = cap_mask.sum(dim=-1).max() |
| y = cap_features[:, :cur_max_seq_len] |
|
|
| y_null = null_cap_feat[:, :cur_max_seq_len] |
| y_null = y_null.expand(y.shape[0], cur_max_seq_len, null_cap_feat.shape[-1]) |
|
|
| |
| with torch.no_grad(): |
| samples = scheduler.sample( |
| model, |
| y, |
| y_null, |
| z, |
| T_max=1.0, |
| T_min=0.0, |
| num_steps=num_inference_steps, |
| cfg_scale=guidance_scale, |
| cfg_low=0.0, |
| cfg_high=1.0, |
| stochasticity_ratio=0.0, |
| sample_type="transition", |
| step_callback=lambda step: progress( |
| 0.1 + 0.9 * (step / num_inference_steps), desc="Generating image..." |
| ), |
| )[-1] |
| samples = samples.to(torch.float32) |
|
|
| |
| images = decode_func(samples) |
| images = ( |
| torch.clamp(127.5 * images + 128.0, 0, 255) |
| .permute(0, 2, 3, 1) |
| .to(torch.uint8) |
| .contiguous() |
| ) |
| image = Image.fromarray(images[0].cpu().numpy()) |
|
|
| progress(1.0, desc="Complete!") |
|
|
| return image, seed |
|
|
| except Exception as e: |
| print(f"Error during image generation: {e}") |
| |
| error_img = Image.new("RGB", (512, 512), color="red") |
| return error_img, seed |
|
|
|
|
| |
| examples = [ |
| ["a tiny astronaut hatching from an egg on the moon"], |
| ["๐ถ Wearing ๐ถ flying on the ๐"], |
| ["an anime illustration of a wiener schnitzel"], |
| ["a photorealistic landscape of mountains at sunset"], |
| ["a majestic lion in a golden savanna at sunset"], |
| ["a futuristic city with flying cars and neon lights"], |
| ["a cozy cabin in a snowy forest with smoke coming from the chimney"], |
| ["a beautiful mermaid swimming in crystal clear water"], |
| ] |
|
|
| |
| css = """ |
| #col-container { |
| margin: 0 auto; |
| max-width: 520px; |
| } |
| """ |
|
|
| |
| try: |
| |
| load_model_components(device) |
| print("Model components loaded successfully!") |
| except Exception as e: |
| print(f"Error loading model components: {e}") |
| print("Please ensure config and checkpoint files are available") |
|
|
| |
| with gr.Blocks(css=css) as demo: |
| with gr.Column(elem_id="col-container"): |
| gr.Markdown("# TiM Text-to-Image Generator") |
| gr.Markdown( |
| "Generate high-quality images from text prompts using the TiM (Transition in Matching) model" |
| ) |
|
|
| with gr.Row(): |
| prompt = gr.Text( |
| label="Prompt", |
| show_label=False, |
| max_lines=1, |
| placeholder="Enter your prompt", |
| container=False, |
| ) |
| run_button = gr.Button("Generate", scale=0) |
|
|
| result = gr.Image(label="Result", show_label=False) |
|
|
| with gr.Accordion("Advanced Settings", open=False): |
| seed = gr.Slider( |
| label="Seed", |
| minimum=0, |
| maximum=MAX_SEED, |
| step=1, |
| value=0, |
| ) |
| randomize_seed = gr.Checkbox(label="Randomize seed", value=True) |
|
|
| with gr.Row(): |
| width = gr.Slider( |
| label="Width", |
| minimum=256, |
| maximum=MAX_IMAGE_SIZE, |
| step=32, |
| value=1024, |
| ) |
| height = gr.Slider( |
| label="Height", |
| minimum=256, |
| maximum=MAX_IMAGE_SIZE, |
| step=32, |
| value=1024, |
| ) |
|
|
| with gr.Row(): |
| guidance_scale = gr.Slider( |
| label="Guidance Scale", |
| minimum=1, |
| maximum=15, |
| step=0.1, |
| value=2.5, |
| ) |
| num_inference_steps = gr.Slider( |
| label="Number of inference steps", |
| minimum=1, |
| maximum=50, |
| step=1, |
| value=16, |
| ) |
|
|
| gr.Examples( |
| examples=examples, |
| fn=generate_image, |
| inputs=[prompt], |
| outputs=[result, seed], |
| cache_examples=True, |
| cache_mode="lazy", |
| ) |
|
|
| gr.on( |
| triggers=[run_button.click, prompt.submit], |
| fn=generate_image, |
| inputs=[ |
| prompt, |
| seed, |
| randomize_seed, |
| width, |
| height, |
| guidance_scale, |
| num_inference_steps, |
| ], |
| outputs=[result, seed], |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|