| |
| """ |
| Callbacks for BackgroundFX Pro UI |
| --------------------------------- |
| All functions here are *thin* wrappers wired to the Gradio interface. |
| NO IMPORTS FROM core.app AT MODULE LEVEL to avoid circular imports |
| NO HEAVY IMPORTS (cv2, numpy) AT MODULE LEVEL to avoid CSP issues |
| """ |
|
|
| from __future__ import annotations |
| import os |
| import time |
| from typing import Any, Dict, Tuple |
|
|
| |
| |
|
|
| |
| _try_bg_gen = None |
| try: |
| from utils.bg_generator import generate_ai_background as _try_bg_gen |
| except Exception: |
| pass |
|
|
|
|
| |
| |
| |
| def _generate_ai_background( |
| prompt_text: str, |
| width: int, |
| height: int, |
| bokeh: float, |
| vignette: float, |
| contrast: float, |
| ): |
| """ |
| If utils.bg_generator.generate_ai_background exists, use it. |
| Otherwise fall back to a tiny procedural background made with PIL & NumPy. |
| """ |
| if _try_bg_gen is not None: |
| return _try_bg_gen( |
| prompt_text, |
| width=width, |
| height=height, |
| bokeh=bokeh, |
| vignette=vignette, |
| contrast=contrast, |
| ) |
|
|
| |
| |
| from pathlib import Path |
| import time, random |
| import numpy as np |
| import cv2 |
| from PIL import Image, ImageFilter, ImageOps |
|
|
| TMP_DIR = Path("/tmp/bgfx") |
| TMP_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| palettes = { |
| "office": [(240, 245, 250), (210, 220, 230), (180, 190, 200)], |
| "studio": [(18, 18, 20), (32, 32, 36), (58, 60, 64)], |
| "sunset": [(255,183,77), (255,138,101), (244,143,177)], |
| "forest": [(46,125,50), (102,187,106), (165,214,167)], |
| "ocean": [(33,150,243), (3,169,244), (0,188,212)], |
| "minimal": [(245,246,248), (230,232,236), (214,218,224)], |
| "warm": [(255,224,178), (255,204,128), (255,171,145)], |
| "cool": [(197,202,233), (179,229,252), (178,235,242)], |
| "royal": [(63,81,181), (121,134,203), (159,168,218)], |
| } |
| p = (prompt_text or "").lower() |
| palette = next((pal for k, pal in palettes.items() if k in p), None) |
| if palette is None: |
| random.seed(hash(p) & 0xFFFFFFFF) |
| palette = [tuple(random.randint(90, 200) for _ in range(3)) for _ in range(3)] |
|
|
| def _noise(h, w, octaves=4): |
| acc = np.zeros((h, w), np.float32) |
| for o in range(octaves): |
| s = 2**o |
| small = np.random.rand(h // s + 1, w // s + 1).astype(np.float32) |
| acc += cv2.resize(small, (w, h), interpolation=cv2.INTER_LINEAR) / (o + 1) |
| acc /= max(1e-6, acc.max()) |
| return acc |
|
|
| def _blend(n, pal): |
| h, w = n.shape |
| thr = [0.33, 0.66] |
| img = np.zeros((h, w, 3), np.float32) |
| c0, c1, c2 = [np.array(c, np.float32) for c in pal] |
| img[n < thr[0]] = c0 |
| mid = (n >= thr[0]) & (n < thr[1]) |
| img[mid] = c1 |
| img[n >= thr[1]] = c2 |
| return Image.fromarray(np.clip(img, 0, 255).astype(np.uint8)) |
|
|
| n = _noise(height, width, 4) |
| out = _blend(n, palette) |
|
|
| if bokeh > 0: |
| out = out.filter(ImageFilter.GaussianBlur(radius=min(50, max(0, bokeh)))) |
| if vignette > 0: |
| y, x = np.ogrid[:height, :width] |
| cx, cy = width / 2, height / 2 |
| r = np.sqrt((x - cx) ** 2 + (y - cy) ** 2) |
| mask = 1 - np.clip(r / (max(width, height) / 1.2), 0, 1) |
| mask = (mask**2).astype(np.float32) |
| base = np.array(out).astype(np.float32) / 255.0 |
| out = Image.fromarray(np.clip(base * (mask[..., None] * (1 - vignette) + vignette) * 255, 0, 255).astype(np.uint8)) |
| if contrast != 1.0: |
| out = ImageOps.autocontrast(out, cutoff=1) |
| arr = np.array(out).astype(np.float32) |
| mean = arr.mean(axis=(0, 1), keepdims=True) |
| arr = (arr - mean) * float(contrast) + mean |
| out = Image.fromarray(np.clip(arr, 0, 255).astype(np.uint8)) |
|
|
| ts = int(time.time() * 1000) |
| path = str((TMP_DIR / f"ai_bg_{ts}.png").resolve()) |
| out.save(path) |
| return out, path |
|
|
|
|
| |
| |
| |
| def cb_load_models() -> str: |
| """Load SAM2 + MatAnyOne and return human-readable status.""" |
| try: |
| |
| from core.app import load_models_with_validation |
| result = load_models_with_validation() |
| |
| return result |
| except Exception as e: |
| return f"❌ Error loading models: {str(e)}" |
|
|
|
|
| |
| |
| |
| def cb_process_video( |
| vid: str, |
| style: str, |
| custom_bg_path: str | None, |
| use_two: bool, |
| chroma: str, |
| key_color_mode: str, |
| prev_mask: bool, |
| prev_green: bool, |
| ): |
| """ |
| Runs the two-stage (or single-stage) pipeline and returns: |
| (processed_video_path | None, status_message:str) |
| """ |
| |
| from core.app import process_video_fixed, PROCESS_CANCELLED |
| |
| |
| if PROCESS_CANCELLED.is_set(): |
| PROCESS_CANCELLED.clear() |
|
|
| |
| return process_video_fixed( |
| video_path=vid, |
| background_choice=style, |
| custom_background_path=custom_bg_path, |
| progress_callback=None, |
| use_two_stage=use_two, |
| chroma_preset=chroma, |
| key_color_mode=key_color_mode, |
| preview_mask=prev_mask, |
| preview_greenscreen=prev_green, |
| ) |
|
|
|
|
| |
| |
| |
| def cb_cancel() -> str: |
| try: |
| from core.app import PROCESS_CANCELLED |
| PROCESS_CANCELLED.set() |
| return "Cancellation requested." |
| except Exception as e: |
| return f"Cancel failed: {e}" |
|
|
| def cb_status() -> Tuple[Dict[str, Any], Dict[str, Any]]: |
| """Get current status - NEVER cache, always return fresh data""" |
| try: |
| |
| |
| model_status = { |
| "models_loaded": False, |
| "sam2_loaded": False, |
| "matanyone_loaded": False, |
| "timestamp": time.time() |
| } |
| |
| cache_status = { |
| "cache_disabled": True, |
| "timestamp": time.time() |
| } |
| |
| |
| try: |
| from core.app import get_model_status, get_cache_status |
| |
| real_model_status = get_model_status() |
| real_cache_status = get_cache_status() |
| |
| |
| if isinstance(real_model_status, dict): |
| if real_model_status.get("timestamp", 0) > time.time() - 5: |
| model_status = real_model_status |
| |
| if isinstance(real_cache_status, dict): |
| cache_status = real_cache_status |
| |
| except Exception: |
| pass |
| |
| return model_status, cache_status |
| |
| except Exception as e: |
| return {"error": str(e), "timestamp": time.time()}, {"error": str(e), "timestamp": time.time()} |
|
|
| def cb_clear(): |
| """Clear all outputs""" |
| |
| return None, "", None, "", None |
|
|
|
|
| |
| |
| |
| def cb_generate_bg(prompt_text: str, w: int, h: int, b: float, v: float, c: float): |
| """Generate AI background""" |
| img, path = _generate_ai_background(prompt_text, int(w), int(h), b, v, c) |
| return img, path |
|
|
| def cb_use_gen_bg(gen_path: str): |
| """ |
| Use generated background as custom. |
| Returns the path for gr.Image to display. |
| """ |
| if gen_path and os.path.exists(gen_path): |
| return gen_path |
| return None |
|
|
|
|
| |
| |
| |
| def cb_video_changed(vid_path: str): |
| """ |
| Extract first frame of the uploaded video for a quick preview. |
| Returns a numpy RGB array (Gradio will display it). |
| """ |
| try: |
| if not vid_path: |
| return None |
| |
| |
| import cv2 |
| |
| cap = cv2.VideoCapture(vid_path) |
| ok, frame = cap.read() |
| cap.release() |
| if not ok: |
| return None |
| |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| return frame_rgb |
| except Exception: |
| return None |
|
|
| def cb_preset_bg_preview(style: str): |
| """ |
| Generate and display preview for preset backgrounds. |
| Returns image for gr.Image component to display. |
| """ |
| try: |
| from utils.cv_processing import create_professional_background |
| |
| preview_bg = create_professional_background(style, 640, 360) |
| return preview_bg |
| except Exception: |
| return None |