| | |
| | """ |
| | cv_processing.py · MAXIMUM QUALITY VERSION with enhanced SAM2Handler integration |
| | Updated to work with enhanced SAM2Handler that has full-body detection strategies |
| | Now includes maximum quality mask cleaning and aggressive post-processing |
| | |
| | All public functions in this module expect RGB images (H,W,3) unless stated otherwise. |
| | CoreVideoProcessor already converts BGR→RGB before calling into this module. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import os |
| | import logging |
| | from pathlib import Path |
| | from typing import Any, Dict, Optional, Tuple, Callable |
| |
|
| | import cv2 |
| | import numpy as np |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | |
| | |
| | def _use_sam2_enabled() -> bool: |
| | """Check if SAM2 should be used based on environment variable""" |
| | val = os.getenv("USE_SAM2", "1") |
| | return val.lower() in ("1", "true", "yes", "on") |
| |
|
| | def _use_matanyone_enabled() -> bool: |
| | """Check if MatAnyone should be used based on environment variable""" |
| | val = os.getenv("USE_MATANYONE", "1") |
| | return val.lower() in ("1", "true", "yes", "on") |
| |
|
| | def _use_max_quality_enabled() -> bool: |
| | """Check if maximum quality processing should be used""" |
| | val = os.getenv("BFX_QUALITY", "max") |
| | return val.lower() == "max" |
| |
|
| | |
| | |
| | |
| | PROFESSIONAL_BACKGROUNDS_LOCAL: Dict[str, Dict[str, Any]] = { |
| | "office": {"color": (240, 248, 255), "gradient": True}, |
| | "studio": {"color": (32, 32, 32), "gradient": False}, |
| | "nature": {"color": (34, 139, 34), "gradient": True}, |
| | "abstract": {"color": (75, 0, 130), "gradient": True}, |
| | "white": {"color": (255, 255, 255), "gradient": False}, |
| | "black": {"color": (0, 0, 0), "gradient": False}, |
| | } |
| | PROFESSIONAL_BACKGROUNDS = PROFESSIONAL_BACKGROUNDS_LOCAL |
| |
|
| | |
| | |
| | |
| | def _ensure_rgb(img: np.ndarray) -> np.ndarray: |
| | """ |
| | Identity for RGB HWC images. If channels-first, convert to HWC. |
| | DOES NOT perform BGR↔RGB swaps (the caller is responsible for color space). |
| | """ |
| | if img is None: |
| | return img |
| | x = np.asarray(img) |
| | if x.ndim == 3 and x.shape[-1] in (3, 4): |
| | return x[..., :3] |
| | if x.ndim == 3 and x.shape[0] in (1, 3, 4) and x.shape[-1] not in (1, 3, 4): |
| | return np.transpose(x, (1, 2, 0))[..., :3] |
| | return x |
| |
|
| | def _ensure_rgb01(frame_rgb: np.ndarray) -> np.ndarray: |
| | """ |
| | Convert RGB uint8/float to RGB float32 in [0,1], HWC. |
| | No channel swaps are performed. |
| | """ |
| | if frame_rgb is None: |
| | raise ValueError("frame_rgb is None") |
| | x = _ensure_rgb(frame_rgb) |
| | if x.dtype == np.uint8: |
| | return (x.astype(np.float32) / 255.0).copy() |
| | if np.issubdtype(x.dtype, np.floating): |
| | return np.clip(x.astype(np.float32), 0.0, 1.0).copy() |
| | |
| | x = np.clip(x, 0, 255).astype(np.uint8) |
| | return (x.astype(np.float32) / 255.0).copy() |
| |
|
| | def _to_mask01(m: np.ndarray) -> np.ndarray: |
| | if m is None: |
| | return None |
| | if m.ndim == 3 and m.shape[2] in (1, 3, 4): |
| | m = m[..., 0] |
| | m = np.asarray(m) |
| | if m.dtype == np.uint8: |
| | m = m.astype(np.float32) / 255.0 |
| | elif m.dtype != np.float32: |
| | m = m.astype(np.float32) |
| | return np.clip(m, 0.0, 1.0) |
| |
|
| | def _mask_to_2d(mask: np.ndarray) -> np.ndarray: |
| | """ |
| | Reduce any mask to 2-D float32 [H,W], contiguous, in [0,1]. |
| | Handles HWC/CHW/B1HW/1HW/HW, etc. |
| | """ |
| | m = np.asarray(mask) |
| |
|
| | |
| | if m.ndim == 3 and m.shape[0] == 1 and (m.shape[1] > 1 and m.shape[2] > 1): |
| | m = m[0] |
| | |
| | if m.ndim == 3 and m.shape[-1] == 1: |
| | m = m[..., 0] |
| | |
| | if m.ndim == 3: |
| | m = m[..., 0] if m.shape[-1] in (1, 3, 4) else m[0] |
| |
|
| | m = np.squeeze(m) |
| | if m.ndim != 2: |
| | |
| | h = int(m.shape[-2]) if m.ndim >= 2 else 512 |
| | w = int(m.shape[-1]) if m.ndim >= 2 else 512 |
| | logger.warning(f"_mask_to_2d: unexpected shape {mask.shape}, creating neutral mask.") |
| | m = np.full((h, w), 0.5, dtype=np.float32) |
| |
|
| | if m.dtype == np.uint8: |
| | m = m.astype(np.float32) / 255.0 |
| | elif m.dtype != np.float32: |
| | m = m.astype(np.float32) |
| |
|
| | return np.ascontiguousarray(np.clip(m, 0.0, 1.0)) |
| |
|
| | def _feather(mask01: np.ndarray, k: int = 2) -> np.ndarray: |
| | if mask01.ndim == 3: |
| | mask01 = mask01[..., 0] |
| | k = max(1, int(k) * 2 + 1) |
| | m = cv2.GaussianBlur((mask01 * 255.0).astype(np.uint8), (k, k), 0) |
| | return (m.astype(np.float32) / 255.0) |
| |
|
| | def _vertical_gradient(top: Tuple[int,int,int], bottom: Tuple[int,int,int], width: int, height: int) -> np.ndarray: |
| | bg = np.zeros((height, width, 3), dtype=np.uint8) |
| | for y in range(height): |
| | t = y / max(1, height - 1) |
| | r = int(top[0] * (1 - t) + bottom[0] * t) |
| | g = int(top[1] * (1 - t) + bottom[1] * t) |
| | b = int(top[2] * (1 - t) + bottom[2] * t) |
| | bg[y, :] = (r, g, b) |
| | return bg |
| |
|
| | |
| | |
| | |
| | def _maximum_quality_mask_cleaning(mask: np.ndarray) -> np.ndarray: |
| | """Maximum quality mask cleaning and refinement - same as TwoStageProcessor.""" |
| | try: |
| | |
| | if mask.max() <= 1.0: |
| | mask_uint8 = (mask * 255).astype(np.uint8) |
| | else: |
| | mask_uint8 = mask.astype(np.uint8) |
| | |
| | |
| | kernel_fill = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9)) |
| | mask_filled = cv2.morphologyEx(mask_uint8, cv2.MORPH_CLOSE, kernel_fill) |
| | |
| | |
| | kernel_connect = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7)) |
| | mask_connected = cv2.morphologyEx(mask_filled, cv2.MORPH_CLOSE, kernel_connect) |
| | |
| | |
| | mask_smooth1 = cv2.GaussianBlur(mask_connected, (7, 7), 2.0) |
| | |
| | |
| | _, mask_thresh = cv2.threshold(mask_smooth1, 127, 255, cv2.THRESH_BINARY) |
| | |
| | |
| | mask_final = cv2.GaussianBlur(mask_thresh, (5, 5), 1.0) |
| | |
| | |
| | kernel_dilate = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) |
| | mask_dilated = cv2.dilate(mask_final, kernel_dilate, iterations=1) |
| | |
| | logger.info("Maximum quality mask cleaning applied successfully") |
| | return (mask_dilated.astype(np.float32) / 255.0) |
| | |
| | except Exception as e: |
| | logger.warning(f"Maximum quality mask cleaning failed: {e}") |
| | return mask |
| |
|
| | |
| | |
| | |
| | def create_professional_background(key_or_cfg: Any, width: int, height: int) -> np.ndarray: |
| | if isinstance(key_or_cfg, str): |
| | cfg = PROFESSIONAL_BACKGROUNDS_LOCAL.get(key_or_cfg, PROFESSIONAL_BACKGROUNDS_LOCAL["office"]) |
| | elif isinstance(key_or_cfg, dict): |
| | cfg = key_or_cfg |
| | else: |
| | cfg = PROFESSIONAL_BACKGROUNDS_LOCAL["office"] |
| |
|
| | color = tuple(int(x) for x in cfg.get("color", (255, 255, 255))) |
| | use_grad = bool(cfg.get("gradient", False)) |
| |
|
| | if not use_grad: |
| | return np.full((height, width, 3), color, dtype=np.uint8) |
| |
|
| | dark = (int(color[0]*0.7), int(color[1]*0.7), int(color[2]*0.7)) |
| | return _vertical_gradient(dark, color, width, height) |
| |
|
| | |
| | |
| | |
| | def _simple_person_segmentation(frame_rgb: np.ndarray) -> np.ndarray: |
| | """Basic fallback segmentation using color detection on RGB frames.""" |
| | h, w = frame_rgb.shape[:2] |
| | hsv = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2HSV) |
| |
|
| | lower_skin = np.array([0, 20, 70], dtype=np.uint8) |
| | upper_skin = np.array([20, 255, 255], dtype=np.uint8) |
| | skin_mask = cv2.inRange(hsv, lower_skin, upper_skin) |
| |
|
| | |
| | lower_green = np.array([40, 40, 40], dtype=np.uint8) |
| | upper_green = np.array([80, 255, 255], dtype=np.uint8) |
| | green_mask = cv2.inRange(hsv, lower_green, upper_green) |
| |
|
| | person_mask = cv2.bitwise_not(green_mask) |
| | person_mask = cv2.bitwise_or(person_mask, skin_mask) |
| |
|
| | kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) |
| | person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_CLOSE, kernel, iterations=2) |
| | person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_OPEN, kernel, iterations=1) |
| |
|
| | contours, _ = cv2.findContours(person_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| | if contours: |
| | largest_contour = max(contours, key=cv2.contourArea) |
| | person_mask = np.zeros_like(person_mask) |
| | cv2.drawContours(person_mask, [largest_contour], -1, 255, -1) |
| |
|
| | mask_result = (person_mask.astype(np.float32) / 255.0) |
| | |
| | |
| | if _use_max_quality_enabled(): |
| | mask_result = _maximum_quality_mask_cleaning(mask_result) |
| | logger.info("Applied maximum quality cleaning to fallback segmentation") |
| | |
| | return mask_result |
| |
|
| | def segment_person_hq( |
| | frame: np.ndarray, |
| | predictor: Optional[Any] = None, |
| | fallback_enabled: bool = True, |
| | use_sam2: Optional[bool] = None, |
| | **_compat_kwargs, |
| | ) -> np.ndarray: |
| | """ |
| | High-quality person segmentation with ENHANCED SAM2Handler integration. |
| | Now uses enhanced SAM2Handler.create_mask() for full-body detection. |
| | Expects RGB frame (H,W,3), uint8 or float in [0,1]. |
| | """ |
| | |
| | if use_sam2 is None: |
| | use_sam2 = _use_sam2_enabled() |
| | |
| | frame_rgb = _ensure_rgb(frame) |
| | h, w = frame_rgb.shape[:2] |
| |
|
| | if use_sam2 is False: |
| | logger.info("SAM2 disabled by environment variable, using fallback segmentation") |
| | return _simple_person_segmentation(frame_rgb) |
| |
|
| | if predictor is not None: |
| | try: |
| | |
| | if hasattr(predictor, 'create_mask'): |
| | logger.info("Using ENHANCED SAM2Handler.create_mask() with full-body detection") |
| | |
| | if frame_rgb.dtype != np.uint8: |
| | rgb_u8 = np.clip(frame_rgb * (255.0 if frame_rgb.dtype != np.uint8 else 1.0), 0, 255).astype(np.uint8) \ |
| | if np.issubdtype(frame_rgb.dtype, np.floating) else frame_rgb.astype(np.uint8) |
| | else: |
| | rgb_u8 = frame_rgb |
| | |
| | |
| | mask = predictor.create_mask(rgb_u8) |
| | |
| | if mask is not None: |
| | |
| | mask_float = _to_mask01(mask) |
| | logger.info(f"Enhanced SAM2Handler mask stats: shape={mask_float.shape}, min={mask_float.min():.3f}, max={mask_float.max():.3f}, mean={mask_float.mean():.3f}") |
| | |
| | if float(mask_float.max()) > 0.1: |
| | |
| | if _use_max_quality_enabled(): |
| | mask_float = _maximum_quality_mask_cleaning(mask_float) |
| | logger.info("Applied additional maximum quality cleaning to enhanced SAM2 result") |
| | return np.ascontiguousarray(mask_float) |
| | else: |
| | logger.warning("Enhanced SAM2Handler mask too weak, using fallback") |
| | else: |
| | logger.warning("Enhanced SAM2Handler returned None mask") |
| | |
| | |
| | elif hasattr(predictor, "set_image") and hasattr(predictor, "predict"): |
| | logger.info("Using legacy SAM2 predictor interface") |
| | |
| | if frame_rgb.dtype != np.uint8: |
| | rgb_u8 = np.clip(frame_rgb * (255.0 if frame_rgb.dtype != np.uint8 else 1.0), 0, 255).astype(np.uint8) \ |
| | if np.issubdtype(frame_rgb.dtype, np.floating) else frame_rgb.astype(np.uint8) |
| | else: |
| | rgb_u8 = frame_rgb |
| |
|
| | predictor.set_image(rgb_u8) |
| |
|
| | |
| | points = np.array([ |
| | [w // 2, h // 2], |
| | [w // 2, h // 4], |
| | [w // 2, h // 2 + h // 8], |
| | ], dtype=np.float32) |
| | labels = np.array([1, 1, 1], dtype=np.int32) |
| |
|
| | result = predictor.predict( |
| | point_coords=points, |
| | point_labels=labels, |
| | multimask_output=True |
| | ) |
| |
|
| | |
| | if isinstance(result, dict): |
| | masks = result.get("masks", None) |
| | scores = result.get("scores", None) |
| | elif isinstance(result, (tuple, list)) and len(result) >= 2: |
| | masks, scores = result[0], result[1] |
| | else: |
| | masks, scores = result, None |
| |
|
| | if masks is not None: |
| | masks = np.asarray(masks) |
| | if masks.ndim == 2: |
| | mask = masks |
| | elif masks.ndim == 3 and masks.shape[0] > 0: |
| | if scores is not None: |
| | best_idx = int(np.argmax(np.asarray(scores))) |
| | mask = masks[best_idx] |
| | else: |
| | mask = masks[0] |
| | elif masks.ndim == 4 and masks.shape[1] == 1: |
| | |
| | if scores is not None: |
| | best_idx = int(np.argmax(np.asarray(scores))) |
| | mask = masks[best_idx, 0] |
| | else: |
| | mask = masks[0, 0] |
| | else: |
| | logger.warning(f"Unexpected mask shape from SAM2: {masks.shape}") |
| | mask = None |
| |
|
| | if mask is not None: |
| | mask = _to_mask01(mask) |
| | |
| | logger.info(f"Legacy SAM2 mask stats: shape={mask.shape}, min={mask.min():.3f}, max={mask.max():.3f}, mean={mask.mean():.3f}") |
| | |
| | if float(mask.max()) > 0.1: |
| | |
| | if _use_max_quality_enabled(): |
| | mask = _maximum_quality_mask_cleaning(mask) |
| | logger.info("Applied maximum quality cleaning to legacy SAM2 result") |
| | return np.ascontiguousarray(mask) |
| | else: |
| | logger.warning("Legacy SAM2 mask too weak, using fallback") |
| | else: |
| | logger.warning("Legacy SAM2 returned no masks") |
| | else: |
| | logger.warning("Predictor doesn't have expected SAM2 interface") |
| |
|
| | except Exception as e: |
| | logger.warning(f"SAM2 segmentation error: {e}") |
| |
|
| | if fallback_enabled: |
| | logger.debug("Using fallback segmentation") |
| | return _simple_person_segmentation(frame_rgb) |
| | else: |
| | return np.ones((h, w), dtype=np.float32) |
| |
|
| | segment_person_hq_original = segment_person_hq |
| |
|
| | |
| | |
| | |
| | def refine_mask_hq( |
| | frame: np.ndarray, |
| | mask: np.ndarray, |
| | matanyone: Optional[Callable] = None, |
| | *, |
| | frame_idx: Optional[int] = None, |
| | fallback_enabled: bool = True, |
| | use_matanyone: Optional[bool] = None, |
| | **_compat_kwargs, |
| | ) -> np.ndarray: |
| | """ |
| | Refine mask with MatAnyone + maximum quality post-processing. |
| | |
| | Modes: |
| | • Stateful (preferred): provide `frame_idx`. On frame_idx==0, the session encodes with the mask. |
| | On subsequent frames, the session propagates without a mask. |
| | • Backward-compat (stateless): if `frame_idx` is None, we try callable/step/process with (frame, mask) |
| | like before. |
| | |
| | Returns: |
| | 2-D float32 alpha [H,W], contiguous, in [0,1] (OpenCV-safe). |
| | """ |
| | |
| | if use_matanyone is None: |
| | use_matanyone = _use_matanyone_enabled() |
| | |
| | mask01 = _to_mask01(mask) |
| |
|
| | if use_matanyone is False: |
| | logger.info("MatAnyone disabled by environment variable, returning unrefined mask") |
| | |
| | if _use_max_quality_enabled(): |
| | mask01 = _maximum_quality_mask_cleaning(mask01) |
| | logger.info("Applied maximum quality cleaning to unrefined mask") |
| | return mask01 |
| |
|
| | if matanyone is not None and callable(matanyone): |
| | try: |
| | rgb01 = _ensure_rgb01(frame) |
| |
|
| | |
| | if frame_idx is not None: |
| | if frame_idx == 0: |
| | refined = matanyone(rgb01, mask01) |
| | else: |
| | refined = matanyone(rgb01) |
| | refined = _mask_to_2d(refined) |
| | if float(refined.max()) > 0.1: |
| | result = _postprocess_mask_max_quality(refined) |
| | return result |
| | logger.warning("MatAnyone stateful refinement produced empty/weak mask; falling back.") |
| |
|
| | |
| | refined = None |
| |
|
| | |
| | try: |
| | refined = matanyone(rgb01, mask01) |
| | refined = _mask_to_2d(refined) |
| | except Exception as e: |
| | logger.debug(f"MatAnyone callable failed: {e}") |
| |
|
| | |
| | if refined is None and hasattr(matanyone, 'step'): |
| | try: |
| | refined = matanyone.step(rgb01, mask01) |
| | refined = _mask_to_2d(refined) |
| | except Exception as e: |
| | logger.debug(f"MatAnyone step failed: {e}") |
| |
|
| | |
| | if refined is None and hasattr(matanyone, 'process'): |
| | try: |
| | refined = matanyone.process(rgb01, mask01) |
| | refined = _mask_to_2d(refined) |
| | except Exception as e: |
| | logger.debug(f"MatAnyone process failed: {e}") |
| |
|
| | if refined is not None and float(refined.max()) > 0.1: |
| | result = _postprocess_mask_max_quality(refined) |
| | return result |
| | else: |
| | logger.warning("MatAnyone refinement failed or produced empty mask") |
| |
|
| | except Exception as e: |
| | logger.warning(f"MatAnyone error: {e}") |
| |
|
| | |
| | if fallback_enabled: |
| | return _fallback_refine_max_quality(mask01) |
| | else: |
| | |
| | if _use_max_quality_enabled(): |
| | mask01 = _maximum_quality_mask_cleaning(mask01) |
| | logger.info("Applied maximum quality cleaning to fallback mask") |
| | return mask01 |
| |
|
| | def _postprocess_mask_max_quality(mask01: np.ndarray) -> np.ndarray: |
| | """Post-process mask with maximum quality cleaning""" |
| | if _use_max_quality_enabled(): |
| | |
| | result = _maximum_quality_mask_cleaning(mask01) |
| | logger.info("Applied maximum quality post-processing to MatAnyone result") |
| | return result |
| | else: |
| | |
| | return _postprocess_mask(mask01) |
| |
|
| | def _postprocess_mask(mask01: np.ndarray) -> np.ndarray: |
| | """Standard post-process mask to clean edges and remove artifacts""" |
| | mask_uint8 = (np.clip(mask01, 0, 1) * 255).astype(np.uint8) |
| |
|
| | kernel_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) |
| | mask_uint8 = cv2.morphologyEx(mask_uint8, cv2.MORPH_CLOSE, kernel_close) |
| |
|
| | mask_uint8 = cv2.GaussianBlur(mask_uint8, (3, 3), 0) |
| |
|
| | _, mask_uint8 = cv2.threshold(mask_uint8, 127, 255, cv2.THRESH_BINARY) |
| |
|
| | mask_uint8 = cv2.GaussianBlur(mask_uint8, (5, 5), 1) |
| |
|
| | out = mask_uint8.astype(np.float32) / 255.0 |
| | return np.ascontiguousarray(out) |
| |
|
| | def _fallback_refine_max_quality(mask01: np.ndarray) -> np.ndarray: |
| | """Fallback refinement with maximum quality option""" |
| | if _use_max_quality_enabled(): |
| | |
| | result = _maximum_quality_mask_cleaning(mask01) |
| | logger.info("Applied maximum quality cleaning to fallback refinement") |
| | return result |
| | else: |
| | |
| | return _fallback_refine(mask01) |
| |
|
| | def _fallback_refine(mask01: np.ndarray) -> np.ndarray: |
| | """Simple fallback refinement""" |
| | mask_uint8 = (np.clip(mask01, 0, 1) * 255).astype(np.uint8) |
| |
|
| | mask_uint8 = cv2.bilateralFilter(mask_uint8, 9, 75, 75) |
| |
|
| | kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) |
| | mask_uint8 = cv2.morphologyEx(mask_uint8, cv2.MORPH_CLOSE, kernel) |
| | mask_uint8 = cv2.morphologyEx(mask_uint8, cv2.MORPH_OPEN, kernel) |
| |
|
| | mask_uint8 = cv2.GaussianBlur(mask_uint8, (5, 5), 1) |
| |
|
| | out = mask_uint8.astype(np.float32) / 255.0 |
| | return np.ascontiguousarray(out) |
| |
|
| | |
| | |
| | |
| | def replace_background_hq( |
| | frame: np.ndarray, |
| | mask01: np.ndarray, |
| | background: np.ndarray, |
| | fallback_enabled: bool = True, |
| | **_compat, |
| | ) -> np.ndarray: |
| | """High-quality background replacement with alpha blending (RGB in/out) - enhanced with max quality.""" |
| | try: |
| | H, W = frame.shape[:2] |
| |
|
| | if background.shape[:2] != (H, W): |
| | background = cv2.resize(background, (W, H), interpolation=cv2.INTER_LANCZOS4) |
| |
|
| | m = _mask_to_2d(_to_mask01(mask01)) |
| |
|
| | |
| | if _use_max_quality_enabled(): |
| | m = _maximum_quality_mask_cleaning(m) |
| | logger.debug("Applied maximum quality cleaning to compositing mask") |
| |
|
| | |
| | feather_strength = 3 if _use_max_quality_enabled() else 1 |
| | m = _feather(m, k=feather_strength) |
| |
|
| | m3 = np.repeat(m[:, :, None], 3, axis=2) |
| |
|
| | comp = frame.astype(np.float32) * m3 + background.astype(np.float32) * (1.0 - m3) |
| |
|
| | return np.clip(comp, 0, 255).astype(np.uint8) |
| |
|
| | except Exception as e: |
| | if fallback_enabled: |
| | logger.warning(f"Compositing failed ({e}) – returning original frame") |
| | return frame |
| | raise |
| |
|
| | |
| | |
| | |
| | def validate_video_file(video_path: str) -> Tuple[bool, str]: |
| | if not video_path or not Path(video_path).exists(): |
| | return False, "Video file not found" |
| |
|
| | try: |
| | size = Path(video_path).stat().st_size |
| | if size == 0: |
| | return False, "File is empty" |
| | if size > 2 * 1024 * 1024 * 1024: |
| | return False, "File > 2 GB" |
| |
|
| | cap = cv2.VideoCapture(video_path) |
| | if not cap.isOpened(): |
| | return False, "Cannot read file" |
| |
|
| | n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| | h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| | cap.release() |
| |
|
| | if n_frames == 0: |
| | return False, "No frames detected" |
| | if fps <= 0 or fps > 120: |
| | return False, f"Invalid FPS: {fps}" |
| | if w <= 0 or h <= 0: |
| | return False, "Invalid resolution" |
| | if w > 4096 or h > 4096: |
| | return False, f"Resolution {w}×{h} too high" |
| | if (n_frames / fps) > 300: |
| | return False, "Video longer than 5 minutes" |
| |
|
| | return True, f"OK → {w}×{h}, {fps:.1f} fps, {n_frames/fps:.1f}s" |
| |
|
| | except Exception as e: |
| | logger.error(f"validate_video_file: {e}") |
| | return False, f"Validation error: {e}" |
| |
|
| | |
| | |
| | |
| | __all__ = [ |
| | "segment_person_hq", |
| | "segment_person_hq_original", |
| | "refine_mask_hq", |
| | "replace_background_hq", |
| | "create_professional_background", |
| | "validate_video_file", |
| | "PROFESSIONAL_BACKGROUNDS", |
| | ] |