""" TimeLapseForge — Image Generator Module v2.0 Supports BOTH local (diffusers) and API-based generation. Unified interface for all providers. """ import gc import torch from typing import List, Dict, Optional, Callable, Tuple from PIL import Image from api_providers import get_provider, BaseProvider, PROVIDERS def flush_memory(): gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() # ═══════════════════════════════════════════ # LOCAL GENERATOR (Same as before — diffusers) # ═══════════════════════════════════════════ _local_pipelines = {} def get_local_pipeline(model_id, pipeline_type="t2i"): global _local_pipelines from diffusers import AutoPipelineForText2Image, AutoPipelineForImage2Image cache_key = f"{model_id}_{pipeline_type}" if cache_key not in _local_pipelines: device = "cuda" if torch.cuda.is_available() else "cpu" dtype = torch.float16 if device == "cuda" else torch.float32 if pipeline_type == "t2i": try: pipe = AutoPipelineForText2Image.from_pretrained( model_id, torch_dtype=dtype, variant="fp16" ) except Exception: pipe = AutoPipelineForText2Image.from_pretrained( model_id, torch_dtype=dtype ) pipe = pipe.to(device) _local_pipelines[cache_key] = pipe elif pipeline_type == "i2i": t2i_key = f"{model_id}_t2i" if t2i_key in _local_pipelines: try: pipe = AutoPipelineForImage2Image.from_pipe(_local_pipelines[t2i_key]) except Exception: try: pipe = AutoPipelineForImage2Image.from_pretrained( model_id, torch_dtype=dtype, variant="fp16" ) except Exception: pipe = AutoPipelineForImage2Image.from_pretrained( model_id, torch_dtype=dtype ) pipe = pipe.to(device) else: try: pipe = AutoPipelineForImage2Image.from_pretrained( model_id, torch_dtype=dtype, variant="fp16" ) except Exception: pipe = AutoPipelineForImage2Image.from_pretrained( model_id, torch_dtype=dtype ) pipe = pipe.to(device) _local_pipelines[cache_key] = pipe return _local_pipelines[cache_key] LOCAL_MODELS = { "stabilityai/sdxl-turbo": { "num_inference_steps": 4, "guidance_scale": 0.0, "default_resolution": (512, 512), }, "stabilityai/stable-diffusion-xl-base-1.0": { "num_inference_steps": 25, "guidance_scale": 7.5, "default_resolution": (1024, 1024), }, "runwayml/stable-diffusion-v1-5": { "num_inference_steps": 25, "guidance_scale": 7.5, "default_resolution": (512, 512), }, } # ═══════════════════════════════════════════ # UNIFIED IMAGE GENERATOR # ═══════════════════════════════════════════ class ImageGenerator: """ Unified image generator supporting both local models and API providers. """ def __init__( self, mode: str = "local", # "local" or "api" # Local params local_model_id: str = "stabilityai/sdxl-turbo", # API params provider_name: str = "openai", api_key: str = "", api_model: str = "", custom_base_url: str = "", custom_endpoint_url: str = "", ): self.mode = mode if mode == "local": self.local_model_id = local_model_id self.config = LOCAL_MODELS.get(local_model_id, { "num_inference_steps": 20, "guidance_scale": 7.0, "default_resolution": (512, 512), }) self.device = "cuda" if torch.cuda.is_available() else "cpu" self.provider = None else: # API mode kwargs = {} if provider_name == "custom_openai": kwargs["base_url"] = custom_base_url elif provider_name == "direct_url": kwargs["endpoint_url"] = custom_endpoint_url self.provider = get_provider(provider_name, api_key, **kwargs) self.api_model = api_model or self.provider.default_model self.config = {"default_resolution": (1024, 1024)} self.device = "cpu" def _generate_local_t2i(self, prompt, negative_prompt="", seed=42, width=None, height=None, steps=None, guidance=None): pipe = get_local_pipeline(self.local_model_id, "t2i") w = width or self.config["default_resolution"][0] h = height or self.config["default_resolution"][1] n_steps = steps or self.config.get("num_inference_steps", 20) cfg = guidance if guidance is not None else self.config.get("guidance_scale", 7.0) gen = torch.Generator(device=self.device).manual_seed(seed) kwargs = dict(prompt=prompt, num_inference_steps=n_steps, guidance_scale=cfg, width=w, height=h, generator=gen) if negative_prompt: kwargs["negative_prompt"] = negative_prompt return pipe(**kwargs).images[0] def _generate_local_i2i(self, prompt, prev_image, strength=0.4, negative_prompt="", seed=42, steps=None, guidance=None): pipe = get_local_pipeline(self.local_model_id, "i2i") n_steps = steps or self.config.get("num_inference_steps", 20) cfg = guidance if guidance is not None else self.config.get("guidance_scale", 7.0) gen = torch.Generator(device=self.device).manual_seed(seed) target_w, target_h = self.config["default_resolution"] prev_resized = prev_image.resize((target_w, target_h), Image.LANCZOS) kwargs = dict(prompt=prompt, image=prev_resized, num_inference_steps=n_steps, guidance_scale=cfg, strength=strength, generator=gen) if negative_prompt: kwargs["negative_prompt"] = negative_prompt return pipe(**kwargs).images[0] def _generate_api_t2i(self, prompt, negative_prompt="", seed=None, width=1024, height=1024, **kwargs): return self.provider.generate_image( prompt=prompt, negative_prompt=negative_prompt, width=width, height=height, seed=seed, model=self.api_model, **kwargs, ) def _generate_api_i2i(self, prompt, prev_image, strength=0.4, negative_prompt="", seed=None, **kwargs): if self.provider.supports_img2img: return self.provider.img2img( prompt=prompt, image=prev_image, strength=strength, negative_prompt=negative_prompt, seed=seed, model=self.api_model, **kwargs, ) else: # Fallback: include description of previous state in prompt enhanced_prompt = ( f"{prompt}, maintaining visual consistency with the previous frame, " f"same camera angle, same lighting, same environment, subtle changes only" ) return self._generate_api_t2i( prompt=enhanced_prompt, negative_prompt=negative_prompt, seed=seed, width=prev_image.width, height=prev_image.height, **kwargs, ) def generate_all_panels( self, prompts: List[Dict[str, str]], strength: float = 0.4, base_seed: int = 42, steps: Optional[int] = None, guidance: Optional[float] = None, width: Optional[int] = None, height: Optional[int] = None, progress_callback: Optional[Callable] = None, reference_image: Optional[Image.Image] = None, ) -> List[Image.Image]: """Generate all panels in sequence — local or API.""" images = [] for i, prompt_data in enumerate(prompts): main_prompt = prompt_data.get("main_prompt", "") style = prompt_data.get("style_suffix", "") full_prompt = f"{main_prompt}, {style}" if style else main_prompt neg_prompt = prompt_data.get("negative_prompt", "") seed = base_seed + i try: if self.mode == "local": if i == 0: if reference_image is not None: img = self._generate_local_i2i( full_prompt, reference_image, max(strength, 0.5), neg_prompt, seed, steps, guidance, ) else: img = self._generate_local_t2i( full_prompt, neg_prompt, seed, width, height, steps, guidance, ) else: img = self._generate_local_i2i( full_prompt, images[-1], strength, neg_prompt, seed, steps, guidance, ) else: # API mode if i == 0: if reference_image is not None and self.provider.supports_img2img: img = self._generate_api_i2i( full_prompt, reference_image, max(strength, 0.5), neg_prompt, seed, ) else: w = width or 1024 h = height or 1024 img = self._generate_api_t2i( full_prompt, neg_prompt, seed, w, h, ) else: img = self._generate_api_i2i( full_prompt, images[-1], strength, neg_prompt, seed, ) images.append(img) except Exception as e: print(f"Error generating panel {i + 1}: {e}") if images: images.append(images[-1].copy()) else: res = self.config.get("default_resolution", (1024, 1024)) images.append(Image.new("RGB", res, (50, 50, 50))) if progress_callback: progress_callback(i + 1, len(prompts)) flush_memory() return images def regenerate_single_panel( self, panel_index, prompts, existing_images, strength=0.4, base_seed=42, steps=None, guidance=None, ) -> Tuple[Image.Image, List[Image.Image]]: """Regenerate a single panel.""" prompt_data = prompts[panel_index] main_prompt = prompt_data.get("main_prompt", "") style = prompt_data.get("style_suffix", "") full_prompt = f"{main_prompt}, {style}" if style else main_prompt neg_prompt = prompt_data.get("negative_prompt", "") seed = base_seed + panel_index if panel_index == 0: if self.mode == "local": new_img = self._generate_local_t2i( full_prompt, neg_prompt, seed, steps=steps, guidance=guidance, ) else: new_img = self._generate_api_t2i(full_prompt, neg_prompt, seed) else: prev = existing_images[panel_index - 1] if self.mode == "local": new_img = self._generate_local_i2i( full_prompt, prev, strength, neg_prompt, seed, steps, guidance, ) else: new_img = self._generate_api_i2i( full_prompt, prev, strength, neg_prompt, seed, ) updated = existing_images.copy() updated[panel_index] = new_img return new_img, updated