| import io |
| import os |
| import gc |
| import re |
| import cv2 |
| import time |
| import zipfile |
| import tempfile |
| import traceback |
| import numpy as np |
| import gradio as gr |
| import imgutils.detect.person as person_detector |
| import imgutils.detect.halfbody as halfbody_detector |
| import imgutils.detect.head as head_detector |
| import imgutils.detect.face as face_detector |
| import imgutils.metrics.ccip as ccip_analyzer |
| import imgutils.metrics.dbaesthetic as dbaesthetic_analyzer |
| import imgutils.metrics.lpips as lpips_module |
| from PIL import Image |
| from typing import List, Tuple, Dict, Any, Union, Optional, Iterator |
|
|
| |
| IMAGE_EXTENSIONS = ('.png', '.jpg', '.jpeg', '.webp', '.bmp', '.tiff', '.tif', '.gif') |
| VIDEO_EXTENSIONS = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.webm', '.mpeg', '.mpg') |
|
|
| |
| def sanitize_filename(filename: str, max_len: int = 50) -> str: |
| """Removes invalid characters and shortens a filename for safe use.""" |
| |
| base_name = os.path.basename(filename) |
| |
| name_part, _ = os.path.splitext(base_name) |
| |
| sanitized = re.sub(r'[\\/*?:"<>|\s]+', '_', name_part) |
| |
| sanitized = sanitized.strip('._') |
| |
| sanitized = sanitized[:max_len] |
| |
| if not sanitized: |
| return "file" |
| return sanitized |
|
|
| def convert_to_pil(frame: np.ndarray) -> Image.Image: |
| """Converts an OpenCV frame (BGR) to a PIL Image (RGB).""" |
| |
| if frame is None or frame.size == 0: |
| raise ValueError("Cannot convert empty frame to PIL Image") |
| try: |
| return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
| except Exception as e: |
| |
| raise RuntimeError(f"Failed to convert frame to PIL Image: {e}") |
|
|
| def image_to_bytes(img: Image.Image, format: str = 'PNG') -> bytes: |
| """Converts a PIL Image to bytes.""" |
| if img is None: |
| raise ValueError("Cannot convert None image to bytes") |
| byte_arr = io.BytesIO() |
| img.save(byte_arr, format=format) |
| return byte_arr.getvalue() |
|
|
| def create_zip_file(image_data: Dict[str, bytes], output_path: str) -> None: |
| """ |
| Creates a zip file containing the provided images directly at the output_path. |
| |
| Args: |
| image_data: A dictionary where keys are filenames (including paths within zip) |
| and values are image bytes. |
| output_path: The full path where the zip file should be created. |
| """ |
| if not image_data: |
| raise ValueError("No image data provided to create zip file.") |
| if not output_path: |
| raise ValueError("No output path provided for the zip file.") |
|
|
| print(f"Creating zip file at: {output_path}") |
|
|
| try: |
| |
| |
| parent_dir = os.path.dirname(output_path) |
| if parent_dir: |
| os.makedirs(parent_dir, exist_ok=True) |
|
|
| with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
| |
| for filename, img_bytes in sorted(image_data.items()): |
| zipf.writestr(filename, img_bytes) |
| print(f"Successfully created zip file with {len(image_data)} items at {output_path}.") |
| |
| except Exception as e: |
| print(f"Error creating zip file at {output_path}: {e}") |
| |
| if os.path.exists(output_path): |
| try: |
| os.remove(output_path) |
| print(f"Removed partially created/failed zip file: {output_path}") |
| except OSError as remove_err: |
| print(f"Warning: Could not remove failed zip file {output_path}: {remove_err}") |
| raise |
|
|
| def generate_filename( |
| base_name: str, |
| aesthetic_label: Optional[str] = None, |
| ccip_cluster_id_for_lpips_logic: Optional[int] = None, |
| ccip_folder_naming_index: Optional[int] = None, |
| source_prefix_for_ccip_folder: Optional[str] = None, |
| lpips_folder_naming_index: Optional[Union[int, str]] = None, |
| file_extension: str = '.png', |
| |
| is_halfbody_primary_target_type: bool = False, |
| is_derived_head_crop: bool = False, |
| is_derived_face_crop: bool = False, |
| ) -> str: |
| """ |
| Generates the final filename, incorporating aesthetic label, cluster directory, |
| and crop indicators. CCIP and LPIPS folder names are sorted by image count. |
| """ |
| filename_stem = base_name |
| |
| |
| |
| |
| |
| |
|
|
| if is_derived_head_crop: |
| filename_stem += "_headCrop" |
| if is_derived_face_crop: |
| filename_stem += "_faceCrop" |
|
|
| filename_with_extension = filename_stem + file_extension |
|
|
| path_parts = [] |
| |
| if ccip_folder_naming_index is not None and source_prefix_for_ccip_folder is not None: |
| path_parts.append(f"{source_prefix_for_ccip_folder}_ccip_{ccip_folder_naming_index:03d}") |
|
|
| |
| if lpips_folder_naming_index is not None: |
| lpips_folder_name_part_str: Optional[str] = None |
| if isinstance(lpips_folder_naming_index, str) and lpips_folder_naming_index == "noise": |
| lpips_folder_name_part_str = "noise" |
| elif isinstance(lpips_folder_naming_index, int): |
| lpips_folder_name_part_str = f"{lpips_folder_naming_index:03d}" |
| |
| if lpips_folder_name_part_str is not None: |
| |
| if ccip_cluster_id_for_lpips_logic is not None: |
| lpips_folder_name_base = "lpips_sub_" |
| else: |
| lpips_folder_name_base = "lpips_" |
| path_parts.append(f"{lpips_folder_name_base}{lpips_folder_name_part_str}") |
|
|
| final_filename_part = filename_with_extension |
| if aesthetic_label: |
| final_filename_part = f"{aesthetic_label}_{filename_with_extension}" |
|
|
| if path_parts: |
| return f"{'/'.join(path_parts)}/{final_filename_part}" |
| else: |
| return final_filename_part |
|
|
| |
| def _process_input_source_frames( |
| source_file_prefix: str, |
| |
| |
| |
| frames_provider: Iterator[Tuple[Image.Image, int, int, int]], |
| is_video_source: bool, |
| |
| enable_person_detection: bool, |
| min_target_width_person_percentage: float, |
| person_model_name: str, |
| person_conf_threshold: float, |
| person_iou_threshold: float, |
| |
| enable_halfbody_detection: bool, |
| enable_halfbody_cropping: bool, |
| min_target_width_halfbody_percentage: float, |
| halfbody_model_name: str, |
| halfbody_conf_threshold: float, |
| halfbody_iou_threshold: float, |
| |
| enable_head_detection: bool, |
| enable_head_cropping: bool, |
| min_crop_width_head_percentage: float, |
| enable_head_filtering: bool, |
| head_model_name: str, |
| head_conf_threshold: float, |
| head_iou_threshold: float, |
| |
| enable_face_detection: bool, |
| enable_face_cropping: bool, |
| min_crop_width_face_percentage: float, |
| enable_face_filtering: bool, |
| face_model_name: str, |
| face_conf_threshold: float, |
| face_iou_threshold: float, |
| |
| enable_ccip_classification: bool, |
| ccip_model_name: str, |
| ccip_threshold: float, |
| |
| enable_lpips_clustering: bool, |
| lpips_threshold: float, |
| |
| enable_aesthetic_analysis: bool, |
| aesthetic_model_name: str, |
| |
| progress_updater |
| ) -> Tuple[str | None, str]: |
| """ |
| Processes frames from a given source (video or image sequence) according to the specified parameters. |
| Order: Person => Half-Body (alternative) => Face Detection => Head Detection => CCIP => Aesthetic. |
| |
| Returns: |
| A tuple containing: |
| - Path to the output zip file (or None if error). |
| - Status message string. |
| """ |
| |
| images_pending_final_processing: List[Dict[str, Any]] = [] |
| |
| |
| ccip_clusters_info: List[Tuple[int, np.ndarray]] = [] |
| next_ccip_cluster_id = 0 |
| |
| |
| processed_items_count = 0 |
| total_persons_detected_raw, total_halfbodies_detected_raw = 0, 0 |
| person_targets_processed_count, halfbody_targets_processed_count, fullframe_targets_processed_count = 0, 0, 0 |
| total_faces_detected_on_targets, total_heads_detected_on_targets = 0, 0 |
| |
| |
| main_targets_pending_count, face_crops_pending_count, head_crops_pending_count = 0, 0, 0 |
| items_filtered_by_face_count, items_filtered_by_head_count = 0, 0 |
| ccip_applied_count, aesthetic_applied_count = 0, 0 |
| |
| lpips_images_subject_to_clustering, total_lpips_clusters_created, total_lpips_noise_samples = 0, 0, 0 |
|
|
| gc_interval = 100 |
| start_time = time.time() |
| |
| |
| progress_updater(0, desc=f"Initializing {source_file_prefix}...") |
| output_zip_path_temp = None |
| output_zip_path_final = None |
|
|
| try: |
| |
| for pil_image_full_frame, frame_specific_index, current_item_index, total_items_for_desc in frames_provider: |
| progress_value_for_updater = (current_item_index) / total_items_for_desc if total_items_for_desc > 0 else 1.0 |
|
|
|
|
| |
| item_description = "" |
| if is_video_source: |
| |
| |
| |
| |
| item_description = f"Scanning frame {current_item_index}/{total_items_for_desc} (processed {processed_items_count + 1} sampled)" |
|
|
| else: |
| item_description = f"image {current_item_index}/{total_items_for_desc}" |
|
|
| progress_updater( |
| min(progress_value_for_updater, 1.0), |
| desc=f"Processing {item_description} for {source_file_prefix}" |
| ) |
| |
| |
| processed_items_count += 1 |
|
|
| try: |
| full_frame_width = pil_image_full_frame.width |
| print(f"--- Processing item ID {frame_specific_index} (Width: {full_frame_width}px) for {source_file_prefix} ---") |
| |
| |
| |
| primary_targets_for_frame: List[Dict[str, Any]] = [] |
| processed_primary_source_this_frame = False |
|
|
| |
| if enable_person_detection and full_frame_width > 0: |
| print(" Attempting Person Detection...") |
| min_person_target_px_width = full_frame_width * min_target_width_person_percentage |
| person_detections = person_detector.detect_person( |
| pil_image_full_frame, model_name=person_model_name, |
| conf_threshold=person_conf_threshold, iou_threshold=person_iou_threshold |
| ) |
| total_persons_detected_raw += len(person_detections) |
| if person_detections: |
| print(f" Detected {len(person_detections)} raw persons.") |
| valid_person_targets = 0 |
| for i, (bbox, _, score) in enumerate(person_detections): |
| |
| detected_person_width = bbox[2] - bbox[0] |
| if detected_person_width >= min_person_target_px_width: |
| primary_targets_for_frame.append({ |
| 'pil': pil_image_full_frame.crop(bbox), |
| 'base_name': f"{source_file_prefix}_item_{frame_specific_index}_person_{i}_score{int(score*100)}", |
| 'source_type': 'person'}) |
| person_targets_processed_count +=1 |
| valid_person_targets +=1 |
| else: |
| print(f" Person {i} width {detected_person_width}px < min {min_person_target_px_width:.0f}px. Skipping.") |
| if valid_person_targets > 0: |
| processed_primary_source_this_frame = True |
| print(f" Added {valid_person_targets} persons as primary targets.") |
| |
| |
| if not processed_primary_source_this_frame and enable_halfbody_detection and full_frame_width > 0: |
| print(" Attempting Half-Body Detection (on full item)...") |
| min_halfbody_target_px_width = full_frame_width * min_target_width_halfbody_percentage |
| halfbody_detections = halfbody_detector.detect_halfbody( |
| pil_image_full_frame, model_name=halfbody_model_name, |
| conf_threshold=halfbody_conf_threshold, iou_threshold=halfbody_iou_threshold |
| ) |
| total_halfbodies_detected_raw += len(halfbody_detections) |
| if halfbody_detections: |
| print(f" Detected {len(halfbody_detections)} raw half-bodies.") |
| valid_halfbody_targets = 0 |
| for i, (bbox, _, score) in enumerate(halfbody_detections): |
| detected_hb_width = bbox[2] - bbox[0] |
| |
| if enable_halfbody_cropping and detected_hb_width >= min_halfbody_target_px_width: |
| primary_targets_for_frame.append({ |
| 'pil': pil_image_full_frame.crop(bbox), |
| 'base_name': f"{source_file_prefix}_item_{frame_specific_index}_halfbody_{i}_score{int(score*100)}", |
| 'source_type': 'halfbody'}) |
| halfbody_targets_processed_count +=1 |
| valid_halfbody_targets +=1 |
| elif enable_halfbody_cropping: |
| print(f" Half-body {i} width {detected_hb_width}px < min {min_halfbody_target_px_width:.0f}px. Skipping.") |
| if valid_halfbody_targets > 0: |
| processed_primary_source_this_frame = True |
| print(f" Added {valid_halfbody_targets} half-bodies as primary targets.") |
|
|
| |
| if not processed_primary_source_this_frame: |
| print(" Processing Full Item as primary target.") |
| primary_targets_for_frame.append({ |
| 'pil': pil_image_full_frame.copy(), |
| 'base_name': f"{source_file_prefix}_item_{frame_specific_index}_full", |
| 'source_type': 'fullframe'}) |
| fullframe_targets_processed_count += 1 |
| |
| |
| for target_data in primary_targets_for_frame: |
| current_pil: Image.Image = target_data['pil'] |
| current_base_name: str = target_data['base_name'] |
| current_source_type: str = target_data['source_type'] |
| current_pil_width = current_pil.width |
| print(f" Processing target: {current_base_name} (type: {current_source_type}, width: {current_pil_width}px)") |
| |
| |
| keep_this_target = True |
| item_area = current_pil_width * current_pil.height |
| potential_face_crops_pil: List[Image.Image] = [] |
| potential_head_crops_pil: List[Image.Image] = [] |
| |
| |
| if keep_this_target and enable_face_detection and current_pil_width > 0: |
| print(f" Detecting faces in {current_base_name}...") |
| min_face_crop_px_width = current_pil_width * min_crop_width_face_percentage |
| face_detections = face_detector.detect_faces( |
| current_pil, model_name=face_model_name, |
| conf_threshold=face_conf_threshold, iou_threshold=face_iou_threshold |
| ) |
| total_faces_detected_on_targets += len(face_detections) |
| if not face_detections and enable_face_filtering: |
| keep_this_target = False |
| items_filtered_by_face_count += 1 |
| print(f" FILTERING TARGET {current_base_name} (no face).") |
| elif face_detections and enable_face_cropping: |
| for f_idx, (f_bbox, _, _) in enumerate(face_detections): |
| if (f_bbox[2]-f_bbox[0]) >= min_face_crop_px_width: |
| potential_face_crops_pil.append(current_pil.crop(f_bbox)) |
| else: |
| print(f" Face {f_idx} too small. Skipping crop.") |
| |
| |
| if keep_this_target and enable_head_detection and current_pil_width > 0: |
| print(f" Detecting heads in {current_base_name}...") |
| min_head_crop_px_width = current_pil_width * min_crop_width_head_percentage |
| head_detections = head_detector.detect_heads( |
| current_pil, model_name=head_model_name, |
| conf_threshold=head_conf_threshold, iou_threshold=head_iou_threshold |
| ) |
| total_heads_detected_on_targets += len(head_detections) |
| if not head_detections and enable_head_filtering: |
| keep_this_target = False |
| items_filtered_by_head_count += 1 |
| print(f" FILTERING TARGET {current_base_name} (no head).") |
| potential_face_crops_pil.clear() |
| elif head_detections and enable_head_cropping: |
| for h_idx, (h_bbox, _, _) in enumerate(head_detections): |
| h_w = h_bbox[2]-h_bbox[0] |
| if h_w >= min_head_crop_px_width and item_area > 0: |
| potential_head_crops_pil.append(current_pil.crop(h_bbox)) |
| else: |
| print(f" Head {h_idx} too small or too large relative to parent. Skipping crop.") |
| |
| |
| if not keep_this_target: |
| print(f" Target {current_base_name} was filtered by face/head presence rules. Discarding it and its potential crops.") |
| if current_pil is not None: |
| del current_pil |
| potential_face_crops_pil.clear() |
| potential_head_crops_pil.clear() |
| continue |
| |
| |
| assigned_ccip_id = None |
| if enable_ccip_classification: |
| print(f" Classifying {current_base_name} with CCIP...") |
| try: |
| feature = ccip_analyzer.ccip_extract_feature(current_pil, model=ccip_model_name) |
| best_match_cid = None |
| min_diff = float('inf') |
| |
| if ccip_clusters_info: |
| for cid, rep_f in ccip_clusters_info: |
| diff = ccip_analyzer.ccip_difference(feature, rep_f, model=ccip_model_name) |
| if diff < min_diff: |
| min_diff = diff |
| best_match_cid = cid |
|
|
| |
| if best_match_cid is not None and min_diff < ccip_threshold: |
| assigned_ccip_id = best_match_cid |
| print(f" -> Matched Cluster {assigned_ccip_id} (Diff: {min_diff:.6f} <= Threshold {ccip_threshold:.3f})") |
| else: |
| |
| |
| assigned_ccip_id = next_ccip_cluster_id |
| ccip_clusters_info.append((assigned_ccip_id, feature)) |
| if not ccip_clusters_info or len(ccip_clusters_info) == 1: |
| print(f" -> New Cluster {assigned_ccip_id} (First item or no prior suitable clusters)") |
| else: |
| |
| print(f" -> New Cluster {assigned_ccip_id} (Min diff to others: {min_diff:.6f} > Threshold {ccip_threshold:.3f})") |
| next_ccip_cluster_id += 1 |
| print(f" CCIP: Target {current_base_name} -> Original Cluster ID {assigned_ccip_id}") |
| del feature |
| ccip_applied_count += 1 |
| except Exception as e_ccip: |
| print(f" Error CCIP: {e_ccip}") |
| |
| |
| item_aesthetic_label = None |
| if enable_aesthetic_analysis: |
| print(f" Analyzing {current_base_name} for aesthetics...") |
| try: |
| res = dbaesthetic_analyzer.anime_dbaesthetic(current_pil, model_name=aesthetic_model_name) |
| if isinstance(res, tuple) and len(res) >= 1: |
| item_aesthetic_label = res[0] |
| print(f" Aesthetic: Target {current_base_name} -> {item_aesthetic_label}") |
| aesthetic_applied_count += 1 |
| except Exception as e_aes: |
| print(f" Error Aesthetic: {e_aes}") |
|
|
| add_current_pil_to_pending_list = True |
| if current_source_type == 'fullframe': |
| can_skip_fullframe_target = False |
| if enable_face_detection or enable_head_detection: |
| found_valid_sub_crop_from_enabled_detector = False |
| if enable_face_detection and len(potential_face_crops_pil) > 0: |
| found_valid_sub_crop_from_enabled_detector = True |
|
|
| if not found_valid_sub_crop_from_enabled_detector and \ |
| enable_head_detection and len(potential_head_crops_pil) > 0: |
| found_valid_sub_crop_from_enabled_detector = True |
|
|
| if not found_valid_sub_crop_from_enabled_detector: |
| can_skip_fullframe_target = True |
|
|
| if can_skip_fullframe_target: |
| add_current_pil_to_pending_list = False |
| print(f" Skipping save of fullframe target '{current_base_name}' because all enabled sub-detectors (Face/Head) yielded no valid-width crops.") |
| |
| if add_current_pil_to_pending_list: |
| |
| |
| images_pending_final_processing.append({ |
| 'pil_image': current_pil.copy(), 'base_name_for_filename': current_base_name, |
| 'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label, |
| 'is_halfbody_primary_target_type': (current_source_type == 'halfbody'), |
| 'is_derived_head_crop': False, 'is_derived_face_crop': False, |
| 'lpips_cluster_id': None, |
| 'lpips_folder_naming_index': None |
| }) |
| main_targets_pending_count +=1 |
| |
| |
| for i, fc_pil in enumerate(potential_face_crops_pil): |
| images_pending_final_processing.append({ |
| 'pil_image': fc_pil, 'base_name_for_filename': f"{current_base_name}_face{i}", |
| 'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label, |
| 'is_halfbody_primary_target_type': False, |
| 'is_derived_head_crop': False, 'is_derived_face_crop': True, |
| 'lpips_cluster_id': None, |
| 'lpips_folder_naming_index': None |
| }) |
| face_crops_pending_count+=1 |
| potential_face_crops_pil.clear() |
| |
| |
| for i, hc_pil in enumerate(potential_head_crops_pil): |
| images_pending_final_processing.append({ |
| 'pil_image': hc_pil, 'base_name_for_filename': f"{current_base_name}_head{i}", |
| 'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label, |
| 'is_halfbody_primary_target_type': False, |
| 'is_derived_head_crop': True, 'is_derived_face_crop': False, |
| 'lpips_cluster_id': None, |
| 'lpips_folder_naming_index': None |
| }) |
| head_crops_pending_count+=1 |
| potential_head_crops_pil.clear() |
|
|
| if current_pil is not None: |
| del current_pil |
| |
| primary_targets_for_frame.clear() |
| except Exception as item_proc_err: |
| print(f"!! Major Error processing item ID {frame_specific_index} for {source_file_prefix}: {item_proc_err}") |
| traceback.print_exc() |
| |
| if 'primary_targets_for_frame' in locals(): |
| primary_targets_for_frame.clear() |
| |
| if 'current_pil' in locals() and current_pil is not None: |
| del current_pil |
|
|
| if processed_items_count % gc_interval == 0: |
| gc.collect() |
| print(f" [GC triggered at {processed_items_count} items for {source_file_prefix}]") |
| |
| |
| print(f"\nRunning final GC before LPIPS/Zipping for {source_file_prefix}...") |
| gc.collect() |
|
|
| if not images_pending_final_processing: |
| status_message = f"Processing for {source_file_prefix} finished, but no images were generated or passed filters for LPIPS/Zipping." |
| print(status_message) |
| return None, status_message |
| |
| |
| print(f"\n--- LPIPS Clustering Stage for {source_file_prefix} (Images pending: {len(images_pending_final_processing)}) ---") |
| if enable_lpips_clustering: |
| print(f" LPIPS Clustering enabled with threshold: {lpips_threshold}") |
| lpips_images_subject_to_clustering = len(images_pending_final_processing) |
|
|
| if enable_ccip_classification and next_ccip_cluster_id > 0: |
| print(" LPIPS clustering within CCIP clusters.") |
| images_by_ccip: Dict[Optional[int], List[int]] = {} |
| for i, item_data in enumerate(images_pending_final_processing): |
| ccip_id = item_data['ccip_cluster_id'] |
| if ccip_id not in images_by_ccip: |
| images_by_ccip[ccip_id] = [] |
| images_by_ccip[ccip_id].append(i) |
|
|
| for ccip_id, indices_in_ccip_cluster in images_by_ccip.items(): |
| pils_for_lpips_sub_cluster = [images_pending_final_processing[idx]['pil_image'] for idx in indices_in_ccip_cluster] |
| if len(pils_for_lpips_sub_cluster) > 1: |
| print(f" Clustering {len(pils_for_lpips_sub_cluster)} images in CCIP cluster {ccip_id}...") |
| try: |
| lpips_sub_ids = lpips_module.lpips_clustering(pils_for_lpips_sub_cluster, threshold=lpips_threshold) |
| for i_sub, lpips_id in enumerate(lpips_sub_ids): |
| original_idx = indices_in_ccip_cluster[i_sub] |
| images_pending_final_processing[original_idx]['lpips_cluster_id'] = lpips_id |
| except Exception as e_lpips_sub: |
| print(f" Error LPIPS sub-cluster CCIP {ccip_id}: {e_lpips_sub}") |
| elif len(pils_for_lpips_sub_cluster) == 1: |
| images_pending_final_processing[indices_in_ccip_cluster[0]]['lpips_cluster_id'] = 0 |
| del images_by_ccip |
| if 'pils_for_lpips_sub_cluster' in locals(): |
| del pils_for_lpips_sub_cluster |
| else: |
| print(" LPIPS clustering on all collected images.") |
| all_pils_for_global_lpips = [item['pil_image'] for item in images_pending_final_processing] |
| if len(all_pils_for_global_lpips) > 1: |
| try: |
| lpips_global_ids = lpips_module.lpips_clustering(all_pils_for_global_lpips, threshold=lpips_threshold) |
| for i, lpips_id in enumerate(lpips_global_ids): |
| images_pending_final_processing[i]['lpips_cluster_id'] = lpips_id |
| except Exception as e_lpips_global: |
| print(f" Error LPIPS global: {e_lpips_global}") |
| elif len(all_pils_for_global_lpips) == 1: |
| images_pending_final_processing[0]['lpips_cluster_id'] = 0 |
| del all_pils_for_global_lpips |
| |
| |
| all_final_lpips_ids = [item.get('lpips_cluster_id') for item in images_pending_final_processing if item.get('lpips_cluster_id') is not None] |
| if all_final_lpips_ids: |
| unique_lpips_clusters = set(filter(lambda x: x != -1, all_final_lpips_ids)) |
| total_lpips_clusters_created = len(unique_lpips_clusters) |
| total_lpips_noise_samples = sum(1 for x in all_final_lpips_ids if x == -1) |
| else: |
| print(" LPIPS Clustering disabled.") |
|
|
| |
| original_ccip_id_to_new_naming_index: Dict[int, int] = {} |
| if enable_ccip_classification: |
| print(f" Preparing CCIP folder renaming for {source_file_prefix}...") |
| ccip_image_counts: Dict[int, int] = {} |
| for item_data_for_count in images_pending_final_processing: |
| original_ccip_id_val = item_data_for_count.get('ccip_cluster_id') |
| if original_ccip_id_val is not None: |
| ccip_image_counts[original_ccip_id_val] = ccip_image_counts.get(original_ccip_id_val, 0) + 1 |
|
|
| if ccip_image_counts: |
| |
| sorted_ccip_groups_by_count: List[Tuple[int, int]] = sorted( |
| ccip_image_counts.items(), |
| key=lambda item: item[1], |
| reverse=True |
| ) |
| for new_idx, (original_id, count) in enumerate(sorted_ccip_groups_by_count): |
| original_ccip_id_to_new_naming_index[original_id] = new_idx |
| print(f" CCIP Remap for {source_file_prefix}: Original ID {original_id} (count: {count}) -> New Naming Index {new_idx:03d}") |
| else: |
| print(f" No CCIP-assigned images found for {source_file_prefix} to perform renaming.") |
|
|
| |
| if enable_lpips_clustering: |
| print(f" Preparing LPIPS folder renaming for {source_file_prefix}...") |
| |
| for item_data in images_pending_final_processing: |
| item_data['lpips_folder_naming_index'] = None |
|
|
| if enable_ccip_classification and next_ccip_cluster_id > 0: |
| print(f" LPIPS renaming within CCIP clusters for {source_file_prefix}.") |
| items_grouped_by_original_ccip: Dict[Optional[int], List[Dict[str, Any]]] = {} |
| for item_data in images_pending_final_processing: |
| original_ccip_id = item_data.get('ccip_cluster_id') |
| if original_ccip_id not in items_grouped_by_original_ccip: items_grouped_by_original_ccip[original_ccip_id] = [] |
| items_grouped_by_original_ccip[original_ccip_id].append(item_data) |
|
|
| for original_ccip_id, items_in_ccip in items_grouped_by_original_ccip.items(): |
| lpips_counts_in_ccip: Dict[int, int] = {} |
| for item_data in items_in_ccip: |
| lpips_id = item_data.get('lpips_cluster_id') |
| if lpips_id is not None and lpips_id != -1: |
| lpips_counts_in_ccip[lpips_id] = lpips_counts_in_ccip.get(lpips_id, 0) + 1 |
| |
| lpips_id_to_naming_in_ccip: Dict[int, Union[int, str]] = {} |
| if lpips_counts_in_ccip: |
| sorted_lpips = sorted(lpips_counts_in_ccip.items(), key=lambda x: x[1], reverse=True) |
| for new_idx, (lpips_id, count) in enumerate(sorted_lpips): |
| lpips_id_to_naming_in_ccip[lpips_id] = new_idx |
| ccip_disp = f"OrigCCIP-{original_ccip_id}" if original_ccip_id is not None else "NoCCIP" |
| print(f" LPIPS Remap in {ccip_disp}: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}") |
| |
| for item_data in items_in_ccip: |
| lpips_id = item_data.get('lpips_cluster_id') |
| if lpips_id is not None: |
| if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise" |
| elif lpips_id in lpips_id_to_naming_in_ccip: |
| item_data['lpips_folder_naming_index'] = lpips_id_to_naming_in_ccip[lpips_id] |
| del items_grouped_by_original_ccip |
| else: |
| print(f" Global LPIPS renaming for {source_file_prefix}.") |
| global_lpips_counts: Dict[int, int] = {} |
| for item_data in images_pending_final_processing: |
| lpips_id = item_data.get('lpips_cluster_id') |
| if lpips_id is not None and lpips_id != -1: |
| global_lpips_counts[lpips_id] = global_lpips_counts.get(lpips_id, 0) + 1 |
|
|
| global_lpips_id_to_naming: Dict[int, Union[int, str]] = {} |
| if global_lpips_counts: |
| sorted_global_lpips = sorted(global_lpips_counts.items(), key=lambda x: x[1], reverse=True) |
| for new_idx, (lpips_id, count) in enumerate(sorted_global_lpips): |
| global_lpips_id_to_naming[lpips_id] = new_idx |
| print(f" Global LPIPS Remap: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}") |
|
|
| for item_data in images_pending_final_processing: |
| lpips_id = item_data.get('lpips_cluster_id') |
| if lpips_id is not None: |
| if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise" |
| elif lpips_id in global_lpips_id_to_naming: |
| item_data['lpips_folder_naming_index'] = global_lpips_id_to_naming[lpips_id] |
| gc.collect() |
|
|
| |
| images_to_zip: Dict[str, bytes] = {} |
| print(f"\n--- Final Zipping Stage for {source_file_prefix} ({len(images_pending_final_processing)} items) ---") |
| for item_data in images_pending_final_processing: |
| original_ccip_id_for_item = item_data.get('ccip_cluster_id') |
| current_ccip_naming_idx_for_folder: Optional[int] = None |
|
|
| if enable_ccip_classification and original_ccip_id_for_item is not None and \ |
| original_ccip_id_for_item in original_ccip_id_to_new_naming_index: |
| current_ccip_naming_idx_for_folder = original_ccip_id_to_new_naming_index[original_ccip_id_for_item] |
|
|
| current_lpips_naming_idx_for_folder = item_data.get('lpips_folder_naming_index') |
|
|
| final_filename = generate_filename( |
| base_name=item_data['base_name_for_filename'], |
| aesthetic_label=item_data.get('aesthetic_label'), |
| ccip_cluster_id_for_lpips_logic=original_ccip_id_for_item, |
| ccip_folder_naming_index=current_ccip_naming_idx_for_folder, |
| source_prefix_for_ccip_folder=source_file_prefix if current_ccip_naming_idx_for_folder is not None else None, |
| lpips_folder_naming_index=current_lpips_naming_idx_for_folder, |
| is_halfbody_primary_target_type=item_data['is_halfbody_primary_target_type'], |
| is_derived_head_crop=item_data['is_derived_head_crop'], |
| is_derived_face_crop=item_data['is_derived_face_crop'] |
| ) |
| try: |
| images_to_zip[final_filename] = image_to_bytes(item_data['pil_image']) |
| except Exception as e_bytes: |
| print(f" Error converting/adding {final_filename} to zip: {e_bytes}") |
| finally: |
| if 'pil_image' in item_data and item_data['pil_image'] is not None: |
| del item_data['pil_image'] |
| images_pending_final_processing.clear() |
|
|
| if not images_to_zip: |
| status_message = f"Processing for {source_file_prefix} finished, but no images were converted for zipping." |
| print(status_message) |
| return None, status_message |
|
|
| print(f"Preparing zip file for {source_file_prefix} with {len(images_to_zip)} images...") |
| progress_updater(1.0, desc=f"Creating Zip File for {source_file_prefix}...") |
| zip_start_time = time.time() |
| |
| |
| |
| temp_zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") |
| output_zip_path_temp = temp_zip_file.name |
| temp_zip_file.close() |
|
|
| try: |
| |
| create_zip_file(images_to_zip, output_zip_path_temp) |
| zip_duration = time.time() - zip_start_time |
| print(f"Temporary zip file for {source_file_prefix} created in {zip_duration:.2f} seconds at {output_zip_path_temp}") |
| |
| |
| temp_dir = os.path.dirname(output_zip_path_temp) |
| timestamp = int(time.time()) |
| desired_filename = f"{source_file_prefix}_processed_{timestamp}.zip" |
| output_zip_path_final = os.path.join(temp_dir, desired_filename) |
| |
| |
| print(f"Renaming temp file for {source_file_prefix} to: {output_zip_path_final}") |
| os.rename(output_zip_path_temp, output_zip_path_final) |
| print("Rename successful.") |
| output_zip_path_temp = None |
|
|
| except Exception as zip_or_rename_err: |
| print(f"Error during zip creation or renaming for {source_file_prefix}: {zip_or_rename_err}") |
| |
| if output_zip_path_temp and os.path.exists(output_zip_path_temp): |
| try: |
| os.remove(output_zip_path_temp) |
| except OSError: |
| pass |
| if output_zip_path_final and os.path.exists(output_zip_path_final): |
| try: |
| os.remove(output_zip_path_final) |
| except OSError: |
| pass |
| raise zip_or_rename_err |
| |
| |
| processing_duration = time.time() - start_time - zip_duration |
| total_duration = time.time() - start_time |
| |
| |
| person_stats = "N/A" |
| if enable_person_detection: |
| person_stats = f"{total_persons_detected_raw} raw, {person_targets_processed_count} targets (>{min_target_width_person_percentage*100:.1f}% itemW)" |
|
|
| halfbody_stats = "N/A" |
| if enable_halfbody_detection: |
| halfbody_stats = f"{total_halfbodies_detected_raw} raw, {halfbody_targets_processed_count} targets (>{min_target_width_halfbody_percentage*100:.1f}% itemW)" |
| fullframe_stats = f"{fullframe_targets_processed_count} targets" |
|
|
| face_stats = "N/A" |
| if enable_face_detection: |
| face_stats = f"{total_faces_detected_on_targets} on targets, {face_crops_pending_count} crops pending (>{min_crop_width_face_percentage*100:.1f}% parentW)" |
| if enable_face_filtering: |
| face_stats += f", {items_filtered_by_face_count} targets filtered" |
|
|
| head_stats = "N/A" |
| if enable_head_detection: |
| head_stats = f"{total_heads_detected_on_targets} on targets, {head_crops_pending_count} crops pending (>{min_crop_width_head_percentage*100:.1f}% parentW)" |
| if enable_head_filtering: |
| head_stats += f", {items_filtered_by_head_count} targets filtered" |
|
|
| ccip_stats = "N/A" |
| if enable_ccip_classification: |
| ccip_stats = f"{next_ccip_cluster_id} original clusters created, on {ccip_applied_count} targets. Folders renamed by image count." |
|
|
| lpips_stats = "N/A" |
| if enable_lpips_clustering: |
| lpips_stats = f"{lpips_images_subject_to_clustering} images processed, {total_lpips_clusters_created} clusters, {total_lpips_noise_samples} noise. Folders renamed by image count." |
|
|
| aesthetic_stats = "N/A" |
| if enable_aesthetic_analysis: |
| aesthetic_stats = f"On {aesthetic_applied_count} targets" |
|
|
| item_desc_for_stats = "Items from Provider" if not is_video_source else "Sampled Frames" |
| status_message = ( |
| f"Processing for '{source_file_prefix}' Complete!\n" |
| f"Total time: {total_duration:.2f}s (Proc: {processing_duration:.2f}s, Zip: {zip_duration:.2f}s)\n" |
| f"{item_desc_for_stats}: {total_items_for_desc}, Processed Items: {processed_items_count}\n" |
| f"--- Primary Targets Processed ---\n" |
| f" Person Detection: {person_stats}\n" |
| f" Half-Body Detection: {halfbody_stats}\n" |
| f" Full Item Processing: {fullframe_stats}\n" |
| f"--- Items Pending Final Processing ({main_targets_pending_count} main, {face_crops_pending_count} face, {head_crops_pending_count} head) ---\n" |
| f" Face Detection: {face_stats}\n" |
| f" Head Detection: {head_stats}\n" |
| f" CCIP Classification: {ccip_stats}\n" |
| f" LPIPS Clustering: {lpips_stats}\n" |
| f" Aesthetic Analysis: {aesthetic_stats}\n" |
| f"Zip file contains {len(images_to_zip)} images.\n" |
| f"Output Zip: {output_zip_path_final}" |
| ) |
| print(status_message) |
| progress_updater(1.0, desc=f"Finished {source_file_prefix}!") |
| |
| |
| return output_zip_path_final, status_message |
|
|
| except Exception as e: |
| print(f"!! An unhandled error occurred during processing of {source_file_prefix}: {e}") |
| traceback.print_exc() |
| |
| images_pending_final_processing.clear() |
| ccip_clusters_info.clear() |
| gc.collect() |
| |
| |
| if output_zip_path_temp and os.path.exists(output_zip_path_temp): |
| try: |
| os.remove(output_zip_path_temp) |
| except OSError: |
| pass |
| |
| |
| if output_zip_path_final and os.path.exists(output_zip_path_final): |
| try: |
| os.remove(output_zip_path_final) |
| except OSError: |
| pass |
| return None, f"An error occurred with {source_file_prefix}: {e}" |
|
|
| |
| def process_inputs_main( |
| input_file_objects: List[Any], |
| sample_interval_ms: int, |
| |
| enable_person_detection: bool, |
| min_target_width_person_percentage: float, |
| person_model_name: str, |
| person_conf_threshold: float, |
| person_iou_threshold: float, |
| |
| enable_halfbody_detection: bool, |
| enable_halfbody_cropping: bool, |
| min_target_width_halfbody_percentage: float, |
| halfbody_model_name: str, |
| halfbody_conf_threshold: float, |
| halfbody_iou_threshold: float, |
| |
| enable_head_detection: bool, |
| enable_head_cropping: bool, |
| min_crop_width_head_percentage: float, |
| enable_head_filtering: bool, |
| head_model_name: str, |
| head_conf_threshold: float, |
| head_iou_threshold: float, |
| |
| enable_face_detection: bool, |
| enable_face_cropping: bool, |
| min_crop_width_face_percentage: float, |
| enable_face_filtering: bool, |
| face_model_name: str, |
| face_conf_threshold: float, |
| face_iou_threshold: float, |
| |
| enable_ccip_classification: bool, |
| ccip_model_name: str, |
| ccip_threshold: float, |
| |
| enable_lpips_clustering: bool, |
| lpips_threshold: float, |
| |
| enable_aesthetic_analysis: bool, |
| aesthetic_model_name: str, |
| progress=gr.Progress(track_tqdm=True) |
| ) -> Tuple[Optional[List[str]], str]: |
|
|
| if not input_file_objects: |
| return [], "Error: No files provided." |
|
|
| video_file_temp_objects: List[Any] = [] |
| image_file_temp_objects: List[Any] = [] |
|
|
| for file_obj in input_file_objects: |
| |
| |
| file_name = getattr(file_obj, 'orig_name', file_obj.name) |
| if isinstance(file_name, str): |
| lower_file_name = file_name.lower() |
| if any(lower_file_name.endswith(ext) for ext in VIDEO_EXTENSIONS): |
| video_file_temp_objects.append(file_obj) |
| elif any(lower_file_name.endswith(ext) for ext in IMAGE_EXTENSIONS): |
| image_file_temp_objects.append(file_obj) |
| else: |
| print(f"Warning: File '{file_name}' has an unrecognized extension and will be skipped.") |
| else: |
| print(f"Warning: File object {file_obj} does not have a valid name and will be skipped.") |
|
|
|
|
| output_zip_paths_all_sources = [] |
| all_status_messages = [] |
| |
| total_processing_tasks = (1 if image_file_temp_objects else 0) + len(video_file_temp_objects) |
| if total_processing_tasks == 0: |
| return [], "No processable video or image files found in the input." |
| |
| tasks_completed_count = 0 |
| |
| |
| print(f"--- Overall Batch Processing Settings ---") |
| print(f" Number of image sequences to process: {1 if image_file_temp_objects else 0}") |
| print(f" Number of videos to process: {len(video_file_temp_objects)}") |
| print(f" Sample Interval (for videos): {sample_interval_ms}ms") |
| print(f" Detection Order: Person => Half-Body (alt) => Face => Head. Then: CCIP => LPIPS => Aesthetic.") |
| print(f" Person Detect = {enable_person_detection}" + (f" (MinW:{min_target_width_person_percentage*100:.1f}%, Mdl:{person_model_name}, Conf:{person_conf_threshold:.2f}, IoU:{person_iou_threshold:.2f})" if enable_person_detection else "")) |
| print(f" HalfBody Detect = {enable_halfbody_detection}" + (f" (FullFrameOnly, Crop:{enable_halfbody_cropping}, MinW:{min_target_width_halfbody_percentage*100:.1f}%, Mdl:{halfbody_model_name}, Conf:{halfbody_conf_threshold:.2f}, IoU:{halfbody_iou_threshold:.2f})" if enable_halfbody_detection else "")) |
| print(f" Face Detect = {enable_face_detection}" + (f" (Crop:{enable_face_cropping}, MinW:{min_crop_width_face_percentage*100:.1f}%, Filter:{enable_face_filtering}, Mdl:{face_model_name}, Conf:{face_conf_threshold:.2f}, IoU:{face_iou_threshold:.2f})" if enable_face_detection else "")) |
| print(f" Head Detect = {enable_head_detection}" + (f" (Crop:{enable_head_cropping}, MinW:{min_crop_width_head_percentage*100:.1f}%, Filter:{enable_head_filtering}, Mdl:{head_model_name}, Conf:{head_conf_threshold:.2f}, IoU:{head_iou_threshold:.2f})" if enable_head_detection else "")) |
| print(f" CCIP Classify = {enable_ccip_classification}" + (f" (Mdl:{ccip_model_name}, Thr:{ccip_threshold:.3f})" if enable_ccip_classification else "")) |
| print(f" LPIPS Clustering = {enable_lpips_clustering}" + (f" (Thr:{lpips_threshold:.3f})" if enable_lpips_clustering else "")) |
| print(f" Aesthetic Analyze = {enable_aesthetic_analysis}" + (f" (Mdl:{aesthetic_model_name})" if enable_aesthetic_analysis else "")) |
| print(f"--- End of Overall Settings ---") |
|
|
|
|
| |
| if image_file_temp_objects: |
| image_group_label_base = "ImageGroup" |
| |
| try: |
| first_image_orig_name = getattr(image_file_temp_objects[0], 'orig_name', image_file_temp_objects[0].name) |
| image_group_label_base = sanitize_filename(first_image_orig_name, max_len=20) |
| except: |
| pass |
| |
| image_source_file_prefix = f"{image_group_label_base}_{int(time.time())}" |
| |
| current_task_number = tasks_completed_count + 1 |
| progress_description_prefix = f"Image Seq. {current_task_number}/{total_processing_tasks} ({image_source_file_prefix})" |
| progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting...") |
| print(f"\n>>> Processing Image Sequence: {image_source_file_prefix} ({len(image_file_temp_objects)} images) <<<") |
|
|
| def image_frames_provider_generator() -> Iterator[Tuple[Image.Image, int, int, int]]: |
| num_images = len(image_file_temp_objects) |
| for idx, img_obj in enumerate(image_file_temp_objects): |
| try: |
| pil_img = Image.open(img_obj.name).convert('RGB') |
| yield pil_img, idx, idx + 1, num_images |
| except Exception as e_load: |
| print(f"Error loading image {getattr(img_obj, 'orig_name', img_obj.name)}: {e_load}. Skipping.") |
| |
| |
| |
| continue |
| |
| def image_group_progress_updater(item_progress_value: float, desc: str): |
| overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks |
| progress(overall_progress, desc=f"{progress_description_prefix}: {desc}") |
|
|
| try: |
| zip_file_path_single, status_message_single = _process_input_source_frames( |
| source_file_prefix=image_source_file_prefix, |
| frames_provider=image_frames_provider_generator(), |
| is_video_source=False, |
| enable_person_detection=enable_person_detection, |
| min_target_width_person_percentage=min_target_width_person_percentage, |
| person_model_name=person_model_name, |
| person_conf_threshold=person_conf_threshold, |
| person_iou_threshold=person_iou_threshold, |
| enable_halfbody_detection=enable_halfbody_detection, |
| enable_halfbody_cropping=enable_halfbody_cropping, |
| min_target_width_halfbody_percentage=min_target_width_halfbody_percentage, |
| halfbody_model_name=halfbody_model_name, |
| halfbody_conf_threshold=halfbody_conf_threshold, |
| halfbody_iou_threshold=halfbody_iou_threshold, |
| enable_head_detection=enable_head_detection, |
| enable_head_cropping=enable_head_cropping, |
| min_crop_width_head_percentage=min_crop_width_head_percentage, |
| enable_head_filtering=enable_head_filtering, |
| head_model_name=head_model_name, |
| head_conf_threshold=head_conf_threshold, |
| head_iou_threshold=head_iou_threshold, |
| enable_face_detection=enable_face_detection, |
| enable_face_cropping=enable_face_cropping, |
| min_crop_width_face_percentage=min_crop_width_face_percentage, |
| enable_face_filtering=enable_face_filtering, |
| face_model_name=face_model_name, |
| face_conf_threshold=face_conf_threshold, |
| face_iou_threshold=face_iou_threshold, |
| enable_ccip_classification=enable_ccip_classification, |
| ccip_model_name=ccip_model_name, |
| ccip_threshold=ccip_threshold, |
| enable_lpips_clustering=enable_lpips_clustering, |
| lpips_threshold=lpips_threshold, |
| enable_aesthetic_analysis=enable_aesthetic_analysis, |
| aesthetic_model_name=aesthetic_model_name, |
| progress_updater=image_group_progress_updater |
| ) |
| if zip_file_path_single: |
| output_zip_paths_all_sources.append(zip_file_path_single) |
| all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Succeeded ---\n{status_message_single}") |
| else: |
| all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Failed ---\n{status_message_single}") |
| except Exception as e_img_seq: |
| error_msg = f"Critical error during processing of image sequence {image_source_file_prefix}: {e_img_seq}" |
| print(error_msg) |
| traceback.print_exc() |
| all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing CRITICALLY FAILED ---\n{error_msg}") |
| |
| tasks_completed_count += 1 |
| print(f">>> Finished attempt for Image Sequence: {image_source_file_prefix} <<<") |
|
|
| |
| for video_idx, video_file_temp_obj in enumerate(video_file_temp_objects): |
| video_path_temp = video_file_temp_obj.name |
| video_original_filename = os.path.basename(getattr(video_file_temp_obj, 'orig_name', video_path_temp)) |
| video_source_file_prefix = sanitize_filename(video_original_filename) |
|
|
| current_task_number = tasks_completed_count + 1 |
| progress_description_prefix = f"Video {current_task_number}/{total_processing_tasks}" |
| |
| print(f"\n>>> Processing Video: {video_original_filename} (Sanitized Prefix: {video_source_file_prefix}) <<<") |
| progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting processing...") |
|
|
| |
| |
| |
| def video_frames_provider_generator(video_path: str, interval_ms: int) -> Iterator[Tuple[Image.Image, int, int, int]]: |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| print(f"Error: Could not open video file for provider: {video_path}") |
| return |
|
|
| total_items_for_desc = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
| if total_items_for_desc <= 0: |
| print(f"Warning: Video {video_original_filename} reported {total_items_for_desc} frames. This might be inaccurate. Proceeding...") |
| |
| |
| total_items_for_desc = 1 |
|
|
| |
| last_processed_ms = -float('inf') |
| raw_frames_read_by_provider = 0 |
|
|
| try: |
| while True: |
| |
| |
| current_raw_frame_index = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) |
| current_pos_ms_in_provider = cap.get(cv2.CAP_PROP_POS_MSEC) |
| |
| |
| if raw_frames_read_by_provider > 0 and current_pos_ms_in_provider <= last_processed_ms and interval_ms > 0 : |
| |
| |
| |
| |
| pass |
|
|
|
|
| should_process_this_frame = current_pos_ms_in_provider >= last_processed_ms + interval_ms - 1 |
| |
| ret_frame, frame_cv_data = cap.read() |
| if not ret_frame: |
| break |
| raw_frames_read_by_provider +=1 |
|
|
| if should_process_this_frame: |
| try: |
| pil_img = convert_to_pil(frame_cv_data) |
| last_processed_ms = current_pos_ms_in_provider |
| yield pil_img, int(current_pos_ms_in_provider), current_raw_frame_index + 1, total_items_for_desc |
| except Exception as e_conv: |
| print(f"Error converting frame at {current_pos_ms_in_provider}ms (raw index {current_raw_frame_index}) for {video_original_filename}: {e_conv}. Skipping.") |
| finally: |
| pass |
| finally: |
| if cap.isOpened(): |
| cap.release() |
| print(f" Video capture for provider ({video_original_filename}) released.") |
| |
| def video_progress_updater(item_progress_value: float, desc: str): |
| overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks |
| progress(overall_progress, desc=f"{progress_description_prefix}: {desc}") |
|
|
| try: |
| zip_file_path_single, status_message_single = _process_input_source_frames( |
| source_file_prefix=video_source_file_prefix, |
| frames_provider=video_frames_provider_generator(video_path_temp, sample_interval_ms), |
| is_video_source=True, |
| enable_person_detection=enable_person_detection, |
| min_target_width_person_percentage=min_target_width_person_percentage, |
| person_model_name=person_model_name, |
| person_conf_threshold=person_conf_threshold, |
| person_iou_threshold=person_iou_threshold, |
| enable_halfbody_detection=enable_halfbody_detection, |
| enable_halfbody_cropping=enable_halfbody_cropping, |
| min_target_width_halfbody_percentage=min_target_width_halfbody_percentage, |
| halfbody_model_name=halfbody_model_name, |
| halfbody_conf_threshold=halfbody_conf_threshold, |
| halfbody_iou_threshold=halfbody_iou_threshold, |
| enable_head_detection=enable_head_detection, |
| enable_head_cropping=enable_head_cropping, |
| min_crop_width_head_percentage=min_crop_width_head_percentage, |
| enable_head_filtering=enable_head_filtering, |
| head_model_name=head_model_name, |
| head_conf_threshold=head_conf_threshold, |
| head_iou_threshold=head_iou_threshold, |
| enable_face_detection=enable_face_detection, |
| enable_face_cropping=enable_face_cropping, |
| min_crop_width_face_percentage=min_crop_width_face_percentage, |
| enable_face_filtering=enable_face_filtering, |
| face_model_name=face_model_name, |
| face_conf_threshold=face_conf_threshold, |
| face_iou_threshold=face_iou_threshold, |
| enable_ccip_classification=enable_ccip_classification, |
| ccip_model_name=ccip_model_name, |
| ccip_threshold=ccip_threshold, |
| enable_lpips_clustering=enable_lpips_clustering, |
| lpips_threshold=lpips_threshold, |
| enable_aesthetic_analysis=enable_aesthetic_analysis, |
| aesthetic_model_name=aesthetic_model_name, |
| progress_updater=video_progress_updater |
| ) |
| if zip_file_path_single: |
| output_zip_paths_all_sources.append(zip_file_path_single) |
| all_status_messages.append(f"--- Video ({video_original_filename}) Processing Succeeded ---\n{status_message_single}") |
| else: |
| all_status_messages.append(f"--- Video ({video_original_filename}) Processing Failed ---\n{status_message_single}") |
|
|
| except Exception as e_vid: |
| |
| |
| error_msg = f"Critical error during processing of video {video_original_filename}: {e_vid}" |
| print(error_msg) |
| traceback.print_exc() |
| all_status_messages.append(f"--- Video ({video_original_filename}) Processing CRITICALLY FAILED ---\n{error_msg}") |
|
|
| tasks_completed_count += 1 |
| print(f">>> Finished attempt for Video: {video_original_filename} <<<") |
| |
|
|
| final_summary_message = "\n\n==============================\n\n".join(all_status_messages) |
| |
| successful_zips_count = len(output_zip_paths_all_sources) |
| if successful_zips_count == 0 and total_processing_tasks > 0: |
| final_summary_message = f"ALL {total_processing_tasks} INPUT SOURCE(S) FAILED TO PRODUCE A ZIP FILE.\n\n" + final_summary_message |
| elif total_processing_tasks > 0: |
| final_summary_message = f"Successfully processed {successful_zips_count} out of {total_processing_tasks} input source(s).\n\n" + final_summary_message |
| else: |
| final_summary_message = "No inputs were processed." |
|
|
| progress(1.0, desc="All processing attempts finished.") |
| |
| |
| return output_zip_paths_all_sources, final_summary_message |
|
|
|
|
| |
|
|
| css = """ |
| /* Default (Light Mode) Styles */ |
| #warning { |
| background-color: #FFCCCB; /* Light red background */ |
| padding: 10px; |
| border-radius: 5px; |
| color: #A00000; /* Dark red text */ |
| border: 1px solid #E5B8B7; /* A slightly darker border for more definition */ |
| } |
| /* Dark Mode Styles */ |
| @media (prefers-color-scheme: dark) { |
| #warning { |
| background-color: #5C1A1A; /* Darker red background, suitable for dark mode */ |
| color: #FFDDDD; /* Light pink text, for good contrast against the dark red background */ |
| border: 1px solid #8B0000; /* A more prominent dark red border in dark mode */ |
| } |
| } |
| #status_box { |
| white-space: pre-wrap !important; /* Ensure status messages show newlines */ |
| font-family: monospace; /* Optional: Use monospace for better alignment */ |
| } |
| """ |
|
|
| |
| person_models = ['person_detect_v1.3_s', 'person_detect_v1.2_s', 'person_detect_v1.1_s', 'person_detect_v1.1_m', 'person_detect_v1_m', 'person_detect_v1.1_n', 'person_detect_v0_s', 'person_detect_v0_m', 'person_detect_v0_x'] |
| halfbody_models = ['halfbody_detect_v1.0_s', 'halfbody_detect_v1.0_n', 'halfbody_detect_v0.4_s', 'halfbody_detect_v0.3_s', 'halfbody_detect_v0.2_s'] |
| head_models = ['head_detect_v2.0_s', 'head_detect_v2.0_m', 'head_detect_v2.0_n', 'head_detect_v2.0_x', 'head_detect_v2.0_s_yv11', 'head_detect_v2.0_m_yv11', 'head_detect_v2.0_n_yv11', 'head_detect_v2.0_x_yv11', 'head_detect_v2.0_l_yv11'] |
| face_models = ['face_detect_v1.4_s', 'face_detect_v1.4_n', 'face_detect_v1.3_s', 'face_detect_v1.3_n', 'face_detect_v1.2_s', 'face_detect_v1.1_s', 'face_detect_v1.1_n', 'face_detect_v1_s', 'face_detect_v1_n', 'face_detect_v0_s', 'face_detect_v0_n'] |
| ccip_models = ['ccip-caformer-24-randaug-pruned', 'ccip-caformer-6-randaug-pruned_fp32', 'ccip-caformer-5_fp32'] |
| aesthetic_models = ['swinv2pv3_v0_448_ls0.2_x', 'swinv2pv3_v0_448_ls0.2', 'caformer_s36_v0_ls0.2'] |
|
|
| with gr.Blocks(css=css) as demo: |
| gr.Markdown("# Video Processor using dghs-imgutils") |
| gr.Markdown("Upload one or more videos, or a sequence of images. Videos are processed individually, while multiple images are treated as a single sequence. Each processed source (video or image sequence) is then sequentially analyzed by [dghs-imgutils](https://github.com/deepghs/imgutils) to detect subjects, classify items, and process its content according to your settings, ultimately generating a ZIP file with the extracted images.") |
| gr.Markdown("**Detection Flow:** " + |
| "[Person](https://dghs-imgutils.deepghs.org/main/api_doc/detect/person.html) ⇒ " + |
| "[Half-Body](https://dghs-imgutils.deepghs.org/main/api_doc/detect/halfbody.html) (if no person) ⇒ " + |
| "[Face](https://dghs-imgutils.deepghs.org/main/api_doc/detect/face.html) (on target) ⇒ " + |
| "[Head](https://dghs-imgutils.deepghs.org/main/api_doc/detect/head.html) (on target).") |
| gr.Markdown("**Analysis Flow:** " + |
| "[CCIP](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/ccip.html) Clustering ⇒ " + |
| "[LPIPS](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/lpips.html) Clustering ⇒ " + |
| "[Aesthetic](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/dbaesthetic.html) Labeling.") |
| gr.Markdown("**Note on CCIP Folders:** CCIP cluster folders are named `{source_prefix}_ccip_XXX`, sorted by image count (most images = `_ccip_000`).") |
| gr.Markdown("**Note on LPIPS Folders:** LPIPS cluster folders (e.g., `lpips_XXX` or `lpips_sub_XXX`) are also sorted by image count within their scope. 'noise' folders are named explicitly.") |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| |
| process_button = gr.Button("Process Input(s) & Generate ZIP(s)", variant="primary") |
| input_files = gr.Files(label="Upload Videos or Image Sequences", file_types=['video', 'image'], file_count="multiple") |
| sample_interval_ms = gr.Number(label="Sample Interval (ms, for videos)", value=1000, minimum=1, step=100) |
| |
| |
| gr.Markdown("**Detection Options**") |
| |
| with gr.Accordion("Person Detection Options", open=True): |
| enable_person_detection = gr.Checkbox(label="Enable Person Detection", value=True) |
| with gr.Group() as person_detection_params_group: |
| min_target_width_person_percentage_slider = gr.Slider( |
| minimum=0.0, maximum=1.0, value=0.25, step=0.01, |
| label="Min Target Width (% of Item Width)", |
| info="Minimum width for a detected person to be processed (e.g., 0.25 = 25%)." |
| ) |
| person_model_name_dd = gr.Dropdown(person_models, label="PD Model", value=person_models[0]) |
| person_conf_threshold = gr.Slider(0.0, 1.0, value=0.3, step=0.05, label="PD Conf") |
| person_iou_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="PD IoU") |
| enable_person_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_person_detection, outputs=person_detection_params_group) |
| |
| |
| with gr.Accordion("Half-Body Detection Options", open=True): |
| enable_halfbody_detection = gr.Checkbox(label="Enable Half-Body Detection", value=True) |
| with gr.Group() as halfbody_params_group: |
| gr.Markdown("<small>_Detects half-bodies in full items if Person Detection is off/fails._</small>") |
| enable_halfbody_cropping = gr.Checkbox(label="Use Half-Bodies as Targets", value=True) |
| min_target_width_halfbody_percentage_slider = gr.Slider( |
| minimum=0.0, maximum=1.0, value=0.25, step=0.01, |
| label="Min Target Width (% of Item Width)", |
| info="Minimum width for a detected half-body to be processed (e.g., 0.25 = 25%)." |
| ) |
| halfbody_model_name_dd = gr.Dropdown(halfbody_models, label="HBD Model", value=halfbody_models[0]) |
| halfbody_conf_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="HBD Conf") |
| halfbody_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HBD IoU") |
| enable_halfbody_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_halfbody_detection, outputs=halfbody_params_group) |
| |
| |
| with gr.Accordion("Face Detection Options", open=True): |
| enable_face_detection = gr.Checkbox(label="Enable Face Detection", value=True) |
| with gr.Group() as face_params_group: |
| enable_face_filtering = gr.Checkbox(label="Filter Targets Without Detected Faces", value=True) |
| enable_face_cropping = gr.Checkbox(label="Crop Detected Faces", value=False) |
| min_crop_width_face_percentage_slider = gr.Slider( |
| minimum=0.0, maximum=1.0, value=0.2, step=0.01, |
| label="Min Crop Width (% of Parent Width)", |
| info="Minimum width for a face crop relative to its parent image's width (e.g., 0.2 = 20%)." |
| ) |
| face_model_name_dd = gr.Dropdown(face_models, label="FD Model", value=face_models[0]) |
| face_conf_threshold = gr.Slider(0.0, 1.0, value=0.25, step=0.05, label="FD Conf") |
| face_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="FD IoU") |
| enable_face_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_face_detection, outputs=face_params_group) |
| |
| |
| with gr.Accordion("Head Detection Options", open=True): |
| enable_head_detection = gr.Checkbox(label="Enable Head Detection", value=True) |
| with gr.Group() as head_params_group: |
| gr.Markdown("<small>_Detects heads in targets. Crops if meets width req._</small>") |
| enable_head_filtering = gr.Checkbox(label="Filter Targets Without Heads", value=True) |
| enable_head_cropping = gr.Checkbox(label="Crop Detected Heads", value=False) |
| min_crop_width_head_percentage_slider = gr.Slider( |
| minimum=0.0, maximum=1.0, value=0.2, step=0.01, |
| label="Min Crop Width (% of Parent Width)", |
| info="Minimum width for a head crop relative to its parent image's width (e.g., 0.2 = 20%)." |
| ) |
| head_model_name_dd = gr.Dropdown(head_models, label="HD Model", value=head_models[0]) |
| head_conf_threshold = gr.Slider(0.0, 1.0, value=0.4, step=0.05, label="HD Conf") |
| head_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HD IoU") |
| enable_head_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_head_detection, outputs=head_params_group) |
| |
| |
| gr.Markdown("**Analysis & Classification**") |
| |
| with gr.Accordion("CCIP Classification Options", open=True): |
| enable_ccip_classification = gr.Checkbox(label="Enable CCIP Classification", value=True) |
| with gr.Group() as ccip_params_group: |
| gr.Markdown("<small>_Clusters results by similarity. Folders sorted by image count._</small>") |
| ccip_model_name_dd = gr.Dropdown(ccip_models, label="CCIP Model", value=ccip_models[0]) |
| ccip_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.20, label="CCIP Similarity Threshold") |
| enable_ccip_classification.change(fn=lambda e: gr.update(visible=e), inputs=enable_ccip_classification, outputs=ccip_params_group) |
|
|
| |
| with gr.Accordion("LPIPS Clustering Options", open=True): |
| enable_lpips_clustering = gr.Checkbox(label="Enable LPIPS Clustering", value=True) |
| with gr.Group() as lpips_params_group: |
| gr.Markdown("<small>_Clusters images by LPIPS similarity. Applied after CCIP (if enabled) or globally. Folders sorted by image count._</small>") |
| lpips_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.45, label="LPIPS Similarity Threshold") |
| enable_lpips_clustering.change(fn=lambda e: gr.update(visible=e), inputs=enable_lpips_clustering, outputs=lpips_params_group) |
| |
| |
| with gr.Accordion("Aesthetic Analysis Options", open=True): |
| enable_aesthetic_analysis = gr.Checkbox(label="Enable Aesthetic Analysis (Anime)", value=True) |
| with gr.Group() as aesthetic_params_group: |
| gr.Markdown("<small>_Prepends aesthetic label to filenames._</small>") |
| aesthetic_model_name_dd = gr.Dropdown(aesthetic_models, label="Aesthetic Model", value=aesthetic_models[0]) |
| enable_aesthetic_analysis.change(fn=lambda e: gr.update(visible=e), inputs=enable_aesthetic_analysis, outputs=aesthetic_params_group) |
|
|
| gr.Markdown("---") |
| gr.Markdown("**Warning:** Complex combinations can be slow. Models downloaded on first use.", elem_id="warning") |
|
|
| with gr.Column(scale=1): |
| |
| status_text = gr.Textbox(label="Processing Status", interactive=False, lines=20, elem_id="status_box") |
| output_zips = gr.Files(label="Download Processed Images (ZIPs)") |
| |
| |
| process_button.click( |
| fn=process_inputs_main, |
| inputs=[ |
| input_files, sample_interval_ms, |
| |
| enable_person_detection, min_target_width_person_percentage_slider, |
| person_model_name_dd, person_conf_threshold, person_iou_threshold, |
| |
| enable_halfbody_detection, enable_halfbody_cropping, min_target_width_halfbody_percentage_slider, |
| halfbody_model_name_dd, halfbody_conf_threshold, halfbody_iou_threshold, |
| |
| enable_head_detection, enable_head_cropping, min_crop_width_head_percentage_slider, |
| enable_head_filtering, head_model_name_dd, head_conf_threshold, head_iou_threshold, |
| |
| enable_face_detection, enable_face_cropping, min_crop_width_face_percentage_slider, |
| enable_face_filtering, face_model_name_dd, face_conf_threshold, face_iou_threshold, |
| |
| enable_ccip_classification, ccip_model_name_dd, ccip_threshold_slider, |
| |
| enable_lpips_clustering, lpips_threshold_slider, |
| |
| enable_aesthetic_analysis, aesthetic_model_name_dd, |
| ], |
| outputs=[output_zips, status_text] |
| ) |
|
|
| |
| if __name__ == "__main__": |
| print("Starting Gradio App...") |
| |
| try: |
| print("Checking/Downloading models (this might take a moment)...") |
| |
| dummy_img_pil = Image.new('RGB', (64, 64), color = 'orange') |
| print(" - Person detection...") |
| _ = person_detector.detect_person(dummy_img_pil, model_name=person_models[0]) |
| print(" - HalfBody detection...") |
| _ = halfbody_detector.detect_halfbody(dummy_img_pil, model_name=halfbody_models[0]) |
| print(" - Head detection...") |
| _ = head_detector.detect_heads(dummy_img_pil, model_name=head_models[0]) |
| print(" - Face detection...") |
| _ = face_detector.detect_faces(dummy_img_pil, model_name=face_models[0]) |
| print(" - CCIP feature extraction...") |
| _ = ccip_analyzer.ccip_extract_feature(dummy_img_pil, size=384, model=ccip_models[0]) |
| print(" - LPIPS feature extraction...") |
| _ = lpips_module.lpips_extract_feature(dummy_img_pil) |
| print(" - Aesthetic analysis...") |
| _ = dbaesthetic_analyzer.anime_dbaesthetic(dummy_img_pil, model_name=aesthetic_models[0]) |
| print("Models seem ready or downloaded.") |
| del dummy_img_pil |
| gc.collect() |
| except Exception as model_err: |
| print(f"\n--- !!! WARNING !!! ---") |
| print(f"Could not pre-check/download all models: {model_err}") |
| print(f"Models will be downloaded when first used by the application, which may cause a delay on the first run.") |
| print(f"Check your internet connection and library installation (pip install \"dghs-imgutils[gpu]\").") |
| print(f"-----------------------\n") |
| |
| demo.launch(inbrowser=True) |
|
|