| | """ |
| | Core Utilities Module for BackgroundFX Pro |
| | Contains FileManager, VideoUtils, ImageUtils, and ValidationUtils |
| | """ |
| |
|
| | |
| | import os |
| | if 'OMP_NUM_THREADS' not in os.environ: |
| | os.environ['OMP_NUM_THREADS'] = '4' |
| | os.environ['MKL_NUM_THREADS'] = '4' |
| |
|
| | import shutil |
| | import tempfile |
| | import logging |
| | from pathlib import Path |
| | from typing import Optional, List, Union, Tuple, Dict, Any |
| | from datetime import datetime |
| | import subprocess |
| | import re |
| |
|
| | import cv2 |
| | import numpy as np |
| | import torch |
| | from PIL import Image, ImageEnhance, ImageFilter, ImageDraw |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | |
| | |
| |
|
| | class ValidationUtils: |
| | """Validation utilities for BackgroundFX Pro application.""" |
| | |
| | |
| | SUPPORTED_VIDEO_FORMATS = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv', '.m4v'} |
| | SUPPORTED_IMAGE_FORMATS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} |
| | |
| | |
| | MAX_VIDEO_SIZE = 500 * 1024 * 1024 |
| | MAX_IMAGE_SIZE = 50 * 1024 * 1024 |
| | MIN_VIDEO_SIZE = 1024 |
| | |
| | |
| | MAX_VIDEO_DURATION = 300 |
| | MIN_VIDEO_DURATION = 1 |
| | MAX_RESOLUTION = (3840, 2160) |
| | MIN_RESOLUTION = (320, 240) |
| | MAX_FPS = 120 |
| | MIN_FPS = 10 |
| | |
| | @staticmethod |
| | def validate_video_file(file_path, check_content=False): |
| | """ |
| | Validate video file for processing. |
| | |
| | Args: |
| | file_path: Path to the video file |
| | check_content: Whether to perform deep content validation |
| | |
| | Returns: |
| | tuple: (is_valid, error_message) |
| | """ |
| | from pathlib import Path |
| | |
| | if not file_path: |
| | return False, "No file path provided" |
| | |
| | path = Path(file_path) |
| | |
| | |
| | if not path.exists(): |
| | return False, f"File not found: {file_path}" |
| | |
| | |
| | if path.suffix.lower() not in ValidationUtils.SUPPORTED_VIDEO_FORMATS: |
| | return False, f"Unsupported video format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_VIDEO_FORMATS)}" |
| | |
| | |
| | file_size = path.stat().st_size |
| | if file_size > ValidationUtils.MAX_VIDEO_SIZE: |
| | size_mb = file_size / (1024 * 1024) |
| | return False, f"Video file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_VIDEO_SIZE / (1024 * 1024):.0f}MB)" |
| | |
| | if file_size < ValidationUtils.MIN_VIDEO_SIZE: |
| | return False, "Video file appears to be empty or corrupted" |
| | |
| | |
| | if check_content: |
| | try: |
| | cap = cv2.VideoCapture(str(file_path)) |
| | |
| | if not cap.isOpened(): |
| | return False, "Unable to open video file - may be corrupted" |
| | |
| | |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) |
| | width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| | height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| | |
| | |
| | duration = frame_count / fps if fps > 0 else 0 |
| | |
| | cap.release() |
| | |
| | |
| | if duration > ValidationUtils.MAX_VIDEO_DURATION: |
| | return False, f"Video too long: {duration:.1f}s (max: {ValidationUtils.MAX_VIDEO_DURATION}s)" |
| | |
| | if duration < ValidationUtils.MIN_VIDEO_DURATION: |
| | return False, f"Video too short: {duration:.1f}s (min: {ValidationUtils.MIN_VIDEO_DURATION}s)" |
| | |
| | if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]: |
| | return False, f"Video resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})" |
| | |
| | if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]: |
| | return False, f"Video resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})" |
| | |
| | if fps > ValidationUtils.MAX_FPS: |
| | return False, f"Frame rate too high: {fps:.1f} fps (max: {ValidationUtils.MAX_FPS} fps)" |
| | |
| | if fps < ValidationUtils.MIN_FPS: |
| | return False, f"Frame rate too low: {fps:.1f} fps (min: {ValidationUtils.MIN_FPS} fps)" |
| | |
| | except Exception as e: |
| | return False, f"Error validating video content: {str(e)}" |
| | |
| | return True, "Video file is valid" |
| | |
| | @staticmethod |
| | def validate_image_file(file_path, check_content=False): |
| | """ |
| | Validate image file for background replacement. |
| | |
| | Args: |
| | file_path: Path to the image file |
| | check_content: Whether to perform deep content validation |
| | |
| | Returns: |
| | tuple: (is_valid, error_message) |
| | """ |
| | from pathlib import Path |
| | |
| | if not file_path: |
| | return False, "No file path provided" |
| | |
| | path = Path(file_path) |
| | |
| | |
| | if not path.exists(): |
| | return False, f"File not found: {file_path}" |
| | |
| | |
| | if path.suffix.lower() not in ValidationUtils.SUPPORTED_IMAGE_FORMATS: |
| | return False, f"Unsupported image format: {path.suffix}. Supported formats: {', '.join(ValidationUtils.SUPPORTED_IMAGE_FORMATS)}" |
| | |
| | |
| | file_size = path.stat().st_size |
| | if file_size > ValidationUtils.MAX_IMAGE_SIZE: |
| | size_mb = file_size / (1024 * 1024) |
| | return False, f"Image file too large: {size_mb:.1f}MB (max: {ValidationUtils.MAX_IMAGE_SIZE / (1024 * 1024):.0f}MB)" |
| | |
| | |
| | if check_content: |
| | try: |
| | img = cv2.imread(str(file_path)) |
| | if img is None: |
| | return False, "Unable to read image file - may be corrupted" |
| | |
| | height, width = img.shape[:2] |
| | |
| | |
| | if width > ValidationUtils.MAX_RESOLUTION[0] or height > ValidationUtils.MAX_RESOLUTION[1]: |
| | return False, f"Image resolution too high: {width}x{height} (max: {ValidationUtils.MAX_RESOLUTION[0]}x{ValidationUtils.MAX_RESOLUTION[1]})" |
| | |
| | if width < ValidationUtils.MIN_RESOLUTION[0] or height < ValidationUtils.MIN_RESOLUTION[1]: |
| | return False, f"Image resolution too low: {width}x{height} (min: {ValidationUtils.MIN_RESOLUTION[0]}x{ValidationUtils.MIN_RESOLUTION[1]})" |
| | |
| | except Exception as e: |
| | return False, f"Error validating image content: {str(e)}" |
| | |
| | return True, "Image file is valid" |
| | |
| | @staticmethod |
| | def validate_processing_params(params): |
| | """ |
| | Validate processing parameters. |
| | |
| | Args: |
| | params: Dictionary of processing parameters |
| | |
| | Returns: |
| | tuple: (is_valid, error_message) |
| | """ |
| | if not params: |
| | return False, "No parameters provided" |
| | |
| | |
| | if 'confidence_threshold' in params: |
| | conf = params['confidence_threshold'] |
| | if not isinstance(conf, (int, float)): |
| | return False, "Confidence threshold must be a number" |
| | if conf < 0 or conf > 1: |
| | return False, "Confidence threshold must be between 0 and 1" |
| | |
| | |
| | if 'mask_dilation' in params: |
| | dilation = params['mask_dilation'] |
| | if not isinstance(dilation, int): |
| | return False, "Mask dilation must be an integer" |
| | if dilation < 0 or dilation > 50: |
| | return False, "Mask dilation must be between 0 and 50" |
| | |
| | |
| | if 'edge_smoothing' in params: |
| | smooth = params['edge_smoothing'] |
| | if not isinstance(smooth, int): |
| | return False, "Edge smoothing must be an integer" |
| | if smooth < 0 or smooth > 100: |
| | return False, "Edge smoothing must be between 0 and 100" |
| | |
| | |
| | if 'color_adjustment' in params: |
| | color_adj = params['color_adjustment'] |
| | if not isinstance(color_adj, bool): |
| | return False, "Color adjustment must be a boolean" |
| | |
| | |
| | if 'output_quality' in params: |
| | quality = params['output_quality'] |
| | if not isinstance(quality, int): |
| | return False, "Output quality must be an integer" |
| | if quality < 1 or quality > 100: |
| | return False, "Output quality must be between 1 and 100" |
| | |
| | |
| | if 'processing_method' in params: |
| | method = params['processing_method'] |
| | valid_methods = {'sam2', 'matanyone', 'cv_fallback', 'auto'} |
| | if method not in valid_methods: |
| | return False, f"Invalid processing method. Must be one of: {', '.join(valid_methods)}" |
| | |
| | return True, "Parameters are valid" |
| | |
| | @staticmethod |
| | def validate_output_path(output_path, create_dirs=False): |
| | """ |
| | Validate output path for saving results. |
| | |
| | Args: |
| | output_path: Path where output will be saved |
| | create_dirs: Whether to create directories if they don't exist |
| | |
| | Returns: |
| | tuple: (is_valid, error_message) |
| | """ |
| | from pathlib import Path |
| | |
| | if not output_path: |
| | return False, "No output path provided" |
| | |
| | path = Path(output_path) |
| | parent_dir = path.parent |
| | |
| | |
| | if not parent_dir.exists(): |
| | if create_dirs: |
| | try: |
| | parent_dir.mkdir(parents=True, exist_ok=True) |
| | except Exception as e: |
| | return False, f"Failed to create output directory: {str(e)}" |
| | else: |
| | return False, f"Output directory does not exist: {parent_dir}" |
| | |
| | |
| | if not os.access(parent_dir, os.W_OK): |
| | return False, f"No write permission for directory: {parent_dir}" |
| | |
| | |
| | if path.exists(): |
| | if not os.access(path, os.W_OK): |
| | return False, f"Cannot overwrite existing file: {output_path}" |
| | |
| | return True, "Output path is valid" |
| | |
| | @staticmethod |
| | def sanitize_filename(filename): |
| | """ |
| | Sanitize filename to be safe for filesystem. |
| | |
| | Args: |
| | filename: Original filename |
| | |
| | Returns: |
| | str: Sanitized filename |
| | """ |
| | from pathlib import Path |
| | |
| | |
| | path = Path(filename) |
| | stem = path.stem |
| | suffix = path.suffix |
| | |
| | |
| | |
| | stem = re.sub(r'[^\w\-_.]', '_', stem) |
| | |
| | |
| | stem = re.sub(r'_+', '_', stem) |
| | |
| | |
| | stem = stem.strip('_') |
| | |
| | |
| | if not stem: |
| | stem = 'output' |
| | |
| | |
| | max_length = 200 |
| | if len(stem) > max_length: |
| | stem = stem[:max_length] |
| | |
| | return f"{stem}{suffix}" |
| | |
| | @staticmethod |
| | def validate_memory_available(required_mb=1000): |
| | """ |
| | Check if sufficient memory is available. |
| | |
| | Args: |
| | required_mb: Required memory in megabytes |
| | |
| | Returns: |
| | tuple: (is_sufficient, available_mb, error_message) |
| | """ |
| | try: |
| | import psutil |
| | |
| | mem = psutil.virtual_memory() |
| | available_mb = mem.available / (1024 * 1024) |
| | |
| | if available_mb < required_mb: |
| | return False, available_mb, f"Insufficient memory: {available_mb:.0f}MB available, {required_mb:.0f}MB required" |
| | |
| | return True, available_mb, f"Sufficient memory available: {available_mb:.0f}MB" |
| | |
| | except ImportError: |
| | |
| | return True, -1, "Memory check skipped (psutil not available)" |
| | except Exception as e: |
| | return True, -1, f"Memory check failed: {str(e)}" |
| | |
| | @staticmethod |
| | def validate_gpu_available(): |
| | """ |
| | Check if GPU is available for processing. |
| | |
| | Returns: |
| | tuple: (is_available, device_info) |
| | """ |
| | try: |
| | if torch.cuda.is_available(): |
| | device_name = torch.cuda.get_device_name(0) |
| | memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) |
| | return True, f"GPU available: {device_name} ({memory_gb:.1f}GB)" |
| | else: |
| | return False, "No GPU available - will use CPU" |
| | |
| | except ImportError: |
| | return False, "PyTorch not available for GPU check" |
| | except Exception as e: |
| | return False, f"GPU check failed: {str(e)}" |
| | |
| | @staticmethod |
| | def validate_url(url): |
| | """ |
| | Validate URL format. |
| | |
| | Args: |
| | url: URL string to validate |
| | |
| | Returns: |
| | tuple: (is_valid, error_message) |
| | """ |
| | if not url: |
| | return False, "No URL provided" |
| | |
| | |
| | url_pattern = re.compile( |
| | r'^https?://' |
| | r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' |
| | r'localhost|' |
| | r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
| | r'(?::\d+)?' |
| | r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
| | |
| | if url_pattern.match(url): |
| | return True, "Valid URL" |
| | else: |
| | return False, "Invalid URL format" |
| |
|
| | |
| | |
| | |
| |
|
| | class FileManager: |
| | """Manages file operations for BackgroundFX Pro""" |
| | |
| | def __init__(self, base_dir: Optional[str] = None): |
| | """Initialize FileManager""" |
| | if base_dir: |
| | self.base_dir = Path(base_dir) |
| | else: |
| | self.base_dir = Path(tempfile.gettempdir()) / "backgroundfx_pro" |
| | |
| | self.base_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | self.uploads_dir = self.base_dir / "uploads" |
| | self.outputs_dir = self.base_dir / "outputs" |
| | self.temp_dir = self.base_dir / "temp" |
| | self.cache_dir = self.base_dir / "cache" |
| | |
| | for dir_path in [self.uploads_dir, self.outputs_dir, self.temp_dir, self.cache_dir]: |
| | dir_path.mkdir(parents=True, exist_ok=True) |
| | |
| | logger.info(f"FileManager initialized with base directory: {self.base_dir}") |
| | |
| | def save_upload(self, file_path: Union[str, Path], filename: Optional[str] = None) -> Path: |
| | """Save an uploaded file to the uploads directory""" |
| | file_path = Path(file_path) |
| | |
| | if filename: |
| | dest_path = self.uploads_dir / filename |
| | else: |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | dest_path = self.uploads_dir / f"{timestamp}_{file_path.name}" |
| | |
| | shutil.copy2(file_path, dest_path) |
| | logger.info(f"Saved upload: {dest_path}") |
| | return dest_path |
| | |
| | def create_output_path(self, filename: str, subfolder: Optional[str] = None) -> Path: |
| | """Create a path for an output file""" |
| | if subfolder: |
| | output_dir = self.outputs_dir / subfolder |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | else: |
| | output_dir = self.outputs_dir |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | name_parts = filename.rsplit('.', 1) |
| | if len(name_parts) == 2: |
| | output_path = output_dir / f"{name_parts[0]}_{timestamp}.{name_parts[1]}" |
| | else: |
| | output_path = output_dir / f"{filename}_{timestamp}" |
| | |
| | return output_path |
| | |
| | def get_temp_path(self, filename: Optional[str] = None, extension: str = ".tmp") -> Path: |
| | """Get a temporary file path""" |
| | if filename: |
| | temp_path = self.temp_dir / filename |
| | else: |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") |
| | temp_path = self.temp_dir / f"temp_{timestamp}{extension}" |
| | |
| | return temp_path |
| | |
| | def cleanup_temp(self, max_age_hours: int = 24): |
| | """Clean up old temporary files""" |
| | try: |
| | current_time = datetime.now().timestamp() |
| | max_age_seconds = max_age_hours * 3600 |
| | |
| | for temp_file in self.temp_dir.iterdir(): |
| | if temp_file.is_file(): |
| | file_age = current_time - temp_file.stat().st_mtime |
| | if file_age > max_age_seconds: |
| | temp_file.unlink() |
| | logger.debug(f"Deleted old temp file: {temp_file}") |
| | |
| | logger.info("Temp directory cleanup completed") |
| | except Exception as e: |
| | logger.warning(f"Error during temp cleanup: {e}") |
| | |
| | def get_cache_path(self, key: str, extension: str = ".cache") -> Path: |
| | """Get a cache file path based on a key""" |
| | safe_key = "".join(c if c.isalnum() or c in '-_' else '_' for c in key) |
| | return self.cache_dir / f"{safe_key}{extension}" |
| | |
| | def list_outputs(self, subfolder: Optional[str] = None, extension: Optional[str] = None) -> List[Path]: |
| | """List output files""" |
| | if subfolder: |
| | search_dir = self.outputs_dir / subfolder |
| | else: |
| | search_dir = self.outputs_dir |
| | |
| | if not search_dir.exists(): |
| | return [] |
| | |
| | if extension: |
| | pattern = f"*{extension}" |
| | else: |
| | pattern = "*" |
| | |
| | return sorted(search_dir.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True) |
| | |
| | def delete_file(self, file_path: Union[str, Path]) -> bool: |
| | """Safely delete a file""" |
| | try: |
| | file_path = Path(file_path) |
| | if file_path.exists() and file_path.is_file(): |
| | file_path.unlink() |
| | logger.info(f"Deleted file: {file_path}") |
| | return True |
| | return False |
| | except Exception as e: |
| | logger.error(f"Error deleting file {file_path}: {e}") |
| | return False |
| | |
| | def get_file_info(self, file_path: Union[str, Path]) -> dict: |
| | """Get information about a file""" |
| | file_path = Path(file_path) |
| | |
| | if not file_path.exists(): |
| | return {"exists": False} |
| | |
| | stat = file_path.stat() |
| | return { |
| | "exists": True, |
| | "name": file_path.name, |
| | "size": stat.st_size, |
| | "size_mb": stat.st_size / (1024 * 1024), |
| | "created": datetime.fromtimestamp(stat.st_ctime), |
| | "modified": datetime.fromtimestamp(stat.st_mtime), |
| | "extension": file_path.suffix, |
| | "path": str(file_path.absolute()) |
| | } |
| |
|
| | |
| | |
| | |
| |
|
| | class VideoUtils: |
| | """Utilities for video processing""" |
| | |
| | @staticmethod |
| | def get_video_info(video_path: Union[str, Path]) -> Dict[str, Any]: |
| | """Get detailed video information""" |
| | video_path = str(video_path) |
| | cap = cv2.VideoCapture(video_path) |
| | |
| | if not cap.isOpened(): |
| | logger.error(f"Failed to open video: {video_path}") |
| | return {"error": "Failed to open video"} |
| | |
| | try: |
| | info = { |
| | "fps": cap.get(cv2.CAP_PROP_FPS), |
| | "frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), |
| | "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), |
| | "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), |
| | "codec": VideoUtils._fourcc_to_string(int(cap.get(cv2.CAP_PROP_FOURCC))), |
| | "duration": cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 0 |
| | } |
| | |
| | path = Path(video_path) |
| | if path.exists(): |
| | info["file_size_mb"] = path.stat().st_size / (1024 * 1024) |
| | |
| | return info |
| | |
| | finally: |
| | cap.release() |
| | |
| | @staticmethod |
| | def _fourcc_to_string(fourcc: int) -> str: |
| | """Convert fourcc code to string""" |
| | return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) |
| | |
| | @staticmethod |
| | def extract_frames(video_path: Union[str, Path], |
| | output_dir: Union[str, Path], |
| | frame_interval: int = 1, |
| | max_frames: Optional[int] = None) -> List[Path]: |
| | """Extract frames from video""" |
| | video_path = str(video_path) |
| | output_dir = Path(output_dir) |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | cap = cv2.VideoCapture(video_path) |
| | if not cap.isOpened(): |
| | logger.error(f"Failed to open video: {video_path}") |
| | return [] |
| | |
| | frame_paths = [] |
| | frame_count = 0 |
| | extracted_count = 0 |
| | |
| | try: |
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | |
| | if frame_count % frame_interval == 0: |
| | frame_path = output_dir / f"frame_{frame_count:06d}.png" |
| | cv2.imwrite(str(frame_path), frame) |
| | frame_paths.append(frame_path) |
| | extracted_count += 1 |
| | |
| | if max_frames and extracted_count >= max_frames: |
| | break |
| | |
| | frame_count += 1 |
| | |
| | logger.info(f"Extracted {len(frame_paths)} frames from video") |
| | return frame_paths |
| | |
| | finally: |
| | cap.release() |
| | |
| | @staticmethod |
| | def create_video_from_frames(frame_paths: List[Union[str, Path]], |
| | output_path: Union[str, Path], |
| | fps: float = 30.0, |
| | codec: str = 'mp4v') -> bool: |
| | """Create video from frame images""" |
| | if not frame_paths: |
| | logger.error("No frames provided") |
| | return False |
| | |
| | first_frame = cv2.imread(str(frame_paths[0])) |
| | if first_frame is None: |
| | logger.error(f"Failed to read first frame: {frame_paths[0]}") |
| | return False |
| | |
| | height, width, layers = first_frame.shape |
| | |
| | fourcc = cv2.VideoWriter_fourcc(*codec) |
| | out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) |
| | |
| | try: |
| | for frame_path in frame_paths: |
| | frame = cv2.imread(str(frame_path)) |
| | if frame is not None: |
| | out.write(frame) |
| | else: |
| | logger.warning(f"Failed to read frame: {frame_path}") |
| | |
| | logger.info(f"Created video: {output_path}") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"Error creating video: {e}") |
| | return False |
| | |
| | finally: |
| | out.release() |
| | |
| | @staticmethod |
| | def resize_video(input_path: Union[str, Path], |
| | output_path: Union[str, Path], |
| | target_width: Optional[int] = None, |
| | target_height: Optional[int] = None, |
| | maintain_aspect: bool = True) -> bool: |
| | """Resize video to target dimensions""" |
| | cap = cv2.VideoCapture(str(input_path)) |
| | if not cap.isOpened(): |
| | logger.error(f"Failed to open video: {input_path}") |
| | return False |
| | |
| | orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| | orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) |
| | |
| | if maintain_aspect: |
| | if target_width and not target_height: |
| | aspect = orig_width / orig_height |
| | target_height = int(target_width / aspect) |
| | elif target_height and not target_width: |
| | aspect = orig_width / orig_height |
| | target_width = int(target_height * aspect) |
| | |
| | if not target_width: |
| | target_width = orig_width |
| | if not target_height: |
| | target_height = orig_height |
| | |
| | out = cv2.VideoWriter(str(output_path), fourcc, fps, (target_width, target_height)) |
| | |
| | try: |
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | |
| | resized = cv2.resize(frame, (target_width, target_height)) |
| | out.write(resized) |
| | |
| | logger.info(f"Resized video saved to: {output_path}") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"Error resizing video: {e}") |
| | return False |
| | |
| | finally: |
| | cap.release() |
| | out.release() |
| | |
| | @staticmethod |
| | def extract_audio(video_path: Union[str, Path], |
| | audio_path: Union[str, Path]) -> bool: |
| | """Extract audio from video using ffmpeg""" |
| | try: |
| | cmd = [ |
| | 'ffmpeg', '-i', str(video_path), |
| | '-vn', '-acodec', 'copy', |
| | str(audio_path), '-y' |
| | ] |
| | |
| | result = subprocess.run(cmd, capture_output=True, text=True) |
| | |
| | if result.returncode == 0: |
| | logger.info(f"Audio extracted to: {audio_path}") |
| | return True |
| | else: |
| | logger.error(f"Failed to extract audio: {result.stderr}") |
| | return False |
| | |
| | except FileNotFoundError: |
| | logger.error("ffmpeg not found. Please install ffmpeg.") |
| | return False |
| | except Exception as e: |
| | logger.error(f"Error extracting audio: {e}") |
| | return False |
| | |
| | @staticmethod |
| | def add_audio_to_video(video_path: Union[str, Path], |
| | audio_path: Union[str, Path], |
| | output_path: Union[str, Path]) -> bool: |
| | """Add audio track to video using ffmpeg""" |
| | try: |
| | cmd = [ |
| | 'ffmpeg', '-i', str(video_path), |
| | '-i', str(audio_path), |
| | '-c:v', 'copy', '-c:a', 'aac', |
| | '-map', '0:v:0', '-map', '1:a:0', |
| | str(output_path), '-y' |
| | ] |
| | |
| | result = subprocess.run(cmd, capture_output=True, text=True) |
| | |
| | if result.returncode == 0: |
| | logger.info(f"Video with audio saved to: {output_path}") |
| | return True |
| | else: |
| | logger.error(f"Failed to add audio: {result.stderr}") |
| | return False |
| | |
| | except FileNotFoundError: |
| | logger.error("ffmpeg not found. Please install ffmpeg.") |
| | return False |
| | except Exception as e: |
| | logger.error(f"Error adding audio: {e}") |
| | return False |
| |
|
| | |
| | |
| | |
| |
|
| | class ImageUtils: |
| | """Utilities for image processing and manipulation""" |
| | |
| | @staticmethod |
| | def load_image(image_path: Union[str, Path]) -> Optional[Image.Image]: |
| | """Load an image using PIL""" |
| | try: |
| | return Image.open(str(image_path)) |
| | except Exception as e: |
| | logger.error(f"Failed to load image {image_path}: {e}") |
| | return None |
| | |
| | @staticmethod |
| | def resize_image(image: Image.Image, |
| | max_width: Optional[int] = None, |
| | max_height: Optional[int] = None, |
| | maintain_aspect: bool = True) -> Image.Image: |
| | """Resize image to fit within max dimensions""" |
| | if not max_width and not max_height: |
| | return image |
| | |
| | width, height = image.size |
| | |
| | if maintain_aspect: |
| | scale = 1.0 |
| | if max_width: |
| | scale = min(scale, max_width / width) |
| | if max_height: |
| | scale = min(scale, max_height / height) |
| | |
| | new_width = int(width * scale) |
| | new_height = int(height * scale) |
| | else: |
| | new_width = max_width or width |
| | new_height = max_height or height |
| | |
| | return image.resize((new_width, new_height), Image.Resampling.LANCZOS) |
| | |
| | @staticmethod |
| | def convert_to_cv2(pil_image: Image.Image) -> np.ndarray: |
| | """Convert PIL Image to OpenCV format""" |
| | if pil_image.mode != 'RGB': |
| | pil_image = pil_image.convert('RGB') |
| | |
| | np_image = np.array(pil_image) |
| | return cv2.cvtColor(np_image, cv2.COLOR_RGB2BGR) |
| | |
| | @staticmethod |
| | def convert_from_cv2(cv2_image: np.ndarray) -> Image.Image: |
| | """Convert OpenCV image to PIL format""" |
| | rgb_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2RGB) |
| | return Image.fromarray(rgb_image) |
| | |
| | @staticmethod |
| | def apply_blur(image: Image.Image, radius: float = 5.0) -> Image.Image: |
| | """Apply Gaussian blur to image""" |
| | return image.filter(ImageFilter.GaussianBlur(radius=radius)) |
| | |
| | @staticmethod |
| | def adjust_brightness(image: Image.Image, factor: float = 1.0) -> Image.Image: |
| | """Adjust image brightness""" |
| | enhancer = ImageEnhance.Brightness(image) |
| | return enhancer.enhance(factor) |
| | |
| | @staticmethod |
| | def adjust_contrast(image: Image.Image, factor: float = 1.0) -> Image.Image: |
| | """Adjust image contrast""" |
| | enhancer = ImageEnhance.Contrast(image) |
| | return enhancer.enhance(factor) |
| | |
| | @staticmethod |
| | def adjust_saturation(image: Image.Image, factor: float = 1.0) -> Image.Image: |
| | """Adjust image saturation""" |
| | enhancer = ImageEnhance.Color(image) |
| | return enhancer.enhance(factor) |
| | |
| | @staticmethod |
| | def crop_center(image: Image.Image, crop_width: int, crop_height: int) -> Image.Image: |
| | """Crop image from center""" |
| | width, height = image.size |
| | |
| | left = (width - crop_width) // 2 |
| | top = (height - crop_height) // 2 |
| | right = left + crop_width |
| | bottom = top + crop_height |
| | |
| | return image.crop((left, top, right, bottom)) |
| | |
| | @staticmethod |
| | def create_thumbnail(image: Image.Image, size: Tuple[int, int] = (128, 128)) -> Image.Image: |
| | """Create thumbnail preserving aspect ratio""" |
| | img_copy = image.copy() |
| | img_copy.thumbnail(size, Image.Resampling.LANCZOS) |
| | return img_copy |
| | |
| | @staticmethod |
| | def apply_mask(image: Image.Image, mask: Image.Image, alpha: float = 1.0) -> Image.Image: |
| | """Apply mask to image""" |
| | if image.mode != 'RGBA': |
| | image = image.convert('RGBA') |
| | |
| | if mask.mode != 'L': |
| | mask = mask.convert('L') |
| | |
| | if mask.size != image.size: |
| | mask = mask.resize(image.size, Image.Resampling.LANCZOS) |
| | |
| | if alpha < 1.0: |
| | mask = ImageEnhance.Brightness(mask).enhance(alpha) |
| | |
| | image.putalpha(mask) |
| | return image |
| | |
| | @staticmethod |
| | def composite_images(foreground: Image.Image, |
| | background: Image.Image, |
| | position: Tuple[int, int] = (0, 0), |
| | alpha: float = 1.0) -> Image.Image: |
| | """Composite foreground image over background""" |
| | if foreground.mode != 'RGBA': |
| | foreground = foreground.convert('RGBA') |
| | if background.mode != 'RGBA': |
| | background = background.convert('RGBA') |
| | |
| | if alpha < 1.0: |
| | foreground = foreground.copy() |
| | foreground.putalpha( |
| | ImageEnhance.Brightness(foreground.split()[3]).enhance(alpha) |
| | ) |
| | |
| | output = background.copy() |
| | output.paste(foreground, position, foreground) |
| | |
| | return output |
| | |
| | @staticmethod |
| | def get_image_info(image_path: Union[str, Path]) -> Dict[str, Any]: |
| | """Get image file information""" |
| | try: |
| | image_path = Path(image_path) |
| | |
| | if not image_path.exists(): |
| | return {"exists": False} |
| | |
| | with Image.open(str(image_path)) as img: |
| | info = { |
| | "exists": True, |
| | "filename": image_path.name, |
| | "format": img.format, |
| | "mode": img.mode, |
| | "size": img.size, |
| | "width": img.width, |
| | "height": img.height, |
| | "file_size_mb": image_path.stat().st_size / (1024 * 1024) |
| | } |
| | |
| | if hasattr(img, '_getexif') and img._getexif(): |
| | info["has_exif"] = True |
| | else: |
| | info["has_exif"] = False |
| | |
| | return info |
| | |
| | except Exception as e: |
| | logger.error(f"Error getting image info for {image_path}: {e}") |
| | return {"exists": False, "error": str(e)} |
| | |
| | @staticmethod |
| | def save_image(image: Image.Image, |
| | output_path: Union[str, Path], |
| | quality: int = 95, |
| | optimize: bool = True) -> bool: |
| | """Save image with specified quality""" |
| | try: |
| | output_path = Path(output_path) |
| | output_path.parent.mkdir(parents=True, exist_ok=True) |
| | |
| | save_kwargs = {} |
| | ext = output_path.suffix.lower() |
| | |
| | if ext in ['.jpg', '.jpeg']: |
| | save_kwargs['quality'] = quality |
| | save_kwargs['optimize'] = optimize |
| | elif ext == '.png': |
| | save_kwargs['optimize'] = optimize |
| | |
| | image.save(str(output_path), **save_kwargs) |
| | logger.info(f"Saved image to: {output_path}") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"Failed to save image to {output_path}: {e}") |
| | return False |
| |
|
| | |
| | |
| | |
| |
|
| | def validate_video_file(file_path: str) -> tuple: |
| | """Validate if file is a valid video file.""" |
| | import os |
| | import cv2 |
| | |
| | if not os.path.exists(file_path): |
| | return False, f"File not found: {file_path}" |
| | |
| | try: |
| | cap = cv2.VideoCapture(file_path) |
| | ret = cap.isOpened() |
| | cap.release() |
| | if ret: |
| | return True, "Video file is valid" |
| | else: |
| | return False, "Unable to open video file - may be corrupted" |
| | except Exception as e: |
| | return False, f"Error validating video: {str(e)}" |
| | |