## ENVIRONMENT VARIABLES # MODAL_VOLUME # MODAL_TOKEN_ID # MODAL_ENVIRONMENT # MODAL_TOKEN_SECRET import os import re import cv2 import io import time import base64 import modal import logging import tempfile import numpy as np import gradio as gr from PIL import Image from typing import Dict, Optional, Tuple logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) def process_video(video_path, notes, email, company_name) -> str: """ Process the input video for content moderation using Modal. Steps: 1. Upload the provided video to the configured Modal Volume. 2. Obtain the video dimensions (width, height). 3. Call the Content-Moderation reception_function via Modal (synchronously with .remote). 4. Download the processed video returned by the function to /tmp with a random UUID filename. 5. Return the local path to the downloaded video. """ # Validate inputs if not video_path or not os.path.exists(video_path): logger.error("Invalid video path provided to process_video.") return "Invalid video path." # Helper to obtain width and height def _get_video_dimensions(path: str): width, height = 1920, 1080 # Default values try: # type: ignore cap = cv2.VideoCapture(path) if cap.isOpened(): width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() except Exception as e: logger.debug(f"OpenCV not available or failed to read video dimensions: {e}") return width, height try: # 1. Setup Modal app and volume _ = os.environ.get('MODAL_TOKEN_ID') # Read to ensure environment readiness (kept for parity with process_audio) _ = os.environ.get('MODAL_TOKEN_SECRET') _ = os.environ.get('MODAL_ENVIRONMENT') modal_volume_name = os.environ['MODERATION_MODAL_VOLUME'] # Unique processing folder and paths processing_id = str(int(time.time())) ext = os.path.splitext(video_path)[1] remote_input_path = f"/{processing_id}/input_video{ext}" # 2. Upload video to Modal Volume volume = modal.Volume.from_name(modal_volume_name) try: with volume.batch_upload() as batch: batch.put_file(video_path, remote_input_path) except Exception as e: logger.error(f"Error uploading video to Modal Storage: {e}") return "Error uploading video to Cloud Storage." # 3. Obtain video dimensions width, height = _get_video_dimensions(video_path) # 4. Call Modal function synchronously try: moderation_function = modal.Function.from_name("Content-Moderation", "professional_reception_function") moderation_function.spawn( input_text=str(notes) if notes is not None else "", video_path=remote_input_path, size=(int(width), int(height)), email=email, company_name=company_name ) except Exception as e: logger.error(f"Error calling Modal reception_function: {e}") return "Error calling Outpost to trigger processing." return "Video Request Obtained" except Exception as e: logger.error(f"Unexpected error in process_video: {e}") return "Unexpected error during video processing." # UUID regex pattern for filtering segment sub-directories UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$') def submit_magic_code(magic_code): """ Validate magic code and retrieve segment list from Modal volume. Decodes the Base64 magic code to obtain the epoch-based folder name, mounts the moderation_volume, checks folder existence, and lists UUID-pattern sub-directories. Returns: Tuple of (dropdown_update, status_message, row2_visibility, row3_visibility, row4_visibility, row5_visibility) """ if not magic_code: return ( gr.update(choices=[], label="Segment IDs"), "Please enter a magic code", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) try: # Decode Base64 magic code to get folder name try: decoded_bytes = base64.b64decode(magic_code, validate=True) folder_name = decoded_bytes.decode('utf-8').strip() logger.info(f"Magic code decoded to folder: {folder_name}") except Exception as e: logger.error(f"Failed to decode magic code: {e}") return ( gr.update(choices=[], label="Invalid magic code"), "Invalid magic code format – please verify your code", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) # Get volume name from environment try: modal_volume_name = os.environ['MODERATION_MODAL_VOLUME'] except KeyError: logger.error("MODERATION_MODAL_VOLUME environment variable not set") return ( gr.update(choices=[], label="Configuration error"), "Server configuration error – contact support", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) # Mount volume and check folder existence try: volume = modal.Volume.from_name(modal_volume_name) # List contents at the folder path folder_path = f"/{folder_name}" try: # Use listdir to check if folder exists and list contents entries = volume.listdir(folder_path) except Exception as list_error: logger.error(f"Folder not found or inaccessible: {folder_path} - {list_error}") return ( gr.update(choices=[], label="Folder not found"), "Folder not found – please verify your magic code", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) # Filter for UUID-pattern sub-directories uuid_segments = [] for entry in entries: # Extract just the name (last component of path) entry_name = entry.path.rstrip('/').split('/')[-1] # Check if it's a directory and matches UUID pattern if entry.type == modal.volume.FileEntryType.DIRECTORY and UUID_PATTERN.match(entry_name): uuid_segments.append(entry_name) uuid_segments.sort() # Sort alphabetically for consistent display if not uuid_segments: logger.warning(f"No UUID segments found in folder: {folder_name}") return ( gr.update(choices=[], label="No segments found"), "No segment IDs found in this folder", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) logger.info(f"Found {len(uuid_segments)} UUID segments in folder {folder_name}") return ( gr.update(choices=uuid_segments, value=uuid_segments[0], label="Segment IDs"), f"Loaded {len(uuid_segments)} segment(s)", gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) ) except Exception as e: logger.error(f"Error accessing Modal volume: {e}") return ( gr.update(choices=[], label="Volume access error"), f"Error accessing storage: {str(e)}", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) except Exception as e: logger.error(f"Unexpected error in submit_magic_code: {e}") return ( gr.update(choices=[], label="Error"), "Unexpected error – please try again", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) def download_segment_files(magic_code: str, segment_id: str) -> Tuple[Dict[int, Image.Image], Dict[int, Image.Image], int]: """ Download all frame images and alpha masks for a given segment from Modal volume. Returns: frames_dict: Dict mapping frame_index -> PIL.Image (original frames) masks_dict: Dict mapping frame_index -> PIL.Image (alpha masks) max_frame: Maximum frame index found """ try: modal_volume_name = os.environ['MODERATION_MODAL_VOLUME'] volume = modal.Volume.from_name(modal_volume_name) segment_path = f"/{magic_code}/{segment_id}" frames_dict = {} masks_dict = {} logger.info(f"Downloading files from {segment_path}") # List all files in the segment directory try: files = list(volume.listdir(segment_path)) except Exception as e: logger.error(f"Failed to list segment directory: {e}") return {}, {}, 0 # Parse filenames and download frame_pattern = re.compile(r'^frame_(\d+)\.(jpg|png)$') alpha_pattern = re.compile(r'^alpha_frame_(\d+)\.png$') for entry in files: if entry.type != modal.volume.FileEntryType.FILE: continue filename = os.path.basename(entry.path) # Check if it's a frame file frame_match = frame_pattern.match(filename) if frame_match: frame_idx = int(frame_match.group(1)) try: # Download frame file_data = volume.read_file(f"{segment_path}/{filename}") img = Image.open(io.BytesIO(file_data)) frames_dict[frame_idx] = img.copy() logger.debug(f"Downloaded frame {frame_idx}") except Exception as e: logger.error(f"Failed to download {filename}: {e}") continue # Check if it's an alpha mask file alpha_match = alpha_pattern.match(filename) if alpha_match: frame_idx = int(alpha_match.group(1)) try: # Download alpha mask file_data = volume.read_file(f"{segment_path}/{filename}") img = Image.open(io.BytesIO(file_data)) masks_dict[frame_idx] = img.copy() logger.debug(f"Downloaded alpha mask {frame_idx}") except Exception as e: logger.error(f"Failed to download {filename}: {e}") continue max_frame = max(frames_dict.keys()) if frames_dict else 0 logger.info(f"Downloaded {len(frames_dict)} frames and {len(masks_dict)} masks. Max frame: {max_frame}") return frames_dict, masks_dict, max_frame except Exception as e: logger.error(f"Error downloading segment files: {e}") return {}, {}, 0 def composite_image_with_mask(frame: Image.Image, mask: Optional[Image.Image], show_mask: bool) -> Image.Image: """ Composite the original frame with the alpha mask overlay. Args: frame: Original RGB/RGBA image mask: Alpha mask (grayscale or RGBA) show_mask: Whether to show the mask overlay Returns: Composited PIL Image """ if not show_mask or mask is None: return frame.copy() # Convert frame to RGBA if needed if frame.mode != 'RGBA': frame_rgba = frame.convert('RGBA') else: frame_rgba = frame.copy() # Convert mask to 'L' (grayscale) if needed if mask.mode != 'L': mask_gray = mask.convert('L') else: mask_gray = mask.copy() # Resize mask to match frame if needed if mask_gray.size != frame_rgba.size: mask_gray = mask_gray.resize(frame_rgba.size, Image.Resampling.LANCZOS) # Create a colored overlay (semi-transparent red) overlay = Image.new('RGBA', frame_rgba.size, (255, 0, 0, 128)) # Use mask as alpha channel for the overlay overlay.putalpha(mask_gray) # Composite result = Image.alpha_composite(frame_rgba, overlay) return result def load_segment_frame(segment_id, frame_number, show_mask, magic_code_state, frames_state, masks_state): """ Load and display a specific frame with optional alpha mask overlay. """ if not segment_id or frames_state is None: return None, gr.update() frames_dict = frames_state masks_dict = masks_state frame_idx = int(frame_number) if frame_idx not in frames_dict: logger.warning(f"Frame {frame_idx} not found in downloaded frames") return None, gr.update() frame = frames_dict[frame_idx] mask = masks_dict.get(frame_idx, None) # Composite image with mask result_image = composite_image_with_mask(frame, mask, show_mask) logger.info(f"Loaded frame {frame_idx} with mask overlay: {show_mask}") return result_image, gr.update() def handle_keyboard_navigation(key_code, segment_id, current_frame, show_mask, magic_code_state, frames_state, masks_state): """ Handle left/right arrow key navigation for frame slider. Args: key_code: JavaScript key code ('ArrowLeft' or 'ArrowRight') segment_id: Current segment ID current_frame: Current frame number show_mask: Whether to show alpha mask overlay magic_code_state: Magic code state frames_state: Frames dictionary state masks_state: Masks dictionary state Returns: Tuple of (updated image, updated slider value) """ if not segment_id or frames_state is None: return None, gr.update() frames_dict = frames_state masks_dict = masks_state # Get min/max from available frames available_frames = sorted(frames_dict.keys()) if not available_frames: return None, gr.update() min_frame = available_frames[0] max_frame = available_frames[-1] # Calculate new frame number new_frame = int(current_frame) if key_code == 'ArrowLeft': new_frame = max(min_frame, new_frame - 1) elif key_code == 'ArrowRight': new_frame = min(max_frame, new_frame + 1) else: # Unknown key, no change return None, gr.update() # If frame didn't change (at boundary), return early if new_frame == int(current_frame): return None, gr.update() logger.info(f"Keyboard navigation: {key_code} -> frame {new_frame}") # Load the new frame using existing logic if new_frame not in frames_dict: logger.warning(f"Frame {new_frame} not found in downloaded frames") return None, gr.update() frame = frames_dict[new_frame] mask = masks_dict.get(new_frame, None) # Composite image with mask result_image = composite_image_with_mask(frame, mask, show_mask) # Return updated image and new slider value return result_image, gr.update(value=new_frame) def handle_segment_selection(segment_id, magic_code): """ Handle segment selection: download all files and initialize the view. """ if not segment_id or not magic_code: return None, gr.update(maximum=0, value=0), {}, {}, magic_code, gr.update(interactive=False) logger.info(f"Segment selected: {segment_id}") # Download all frames and masks frames_dict, masks_dict, max_frame = download_segment_files(magic_code, segment_id) if not frames_dict: logger.error("No frames downloaded") return None, gr.update(maximum=0, value=0), {}, {}, magic_code, gr.update(interactive=False) # Load first frame frame_0 = frames_dict.get(0, None) if frame_0 is None: # Use first available frame frame_0 = frames_dict[min(frames_dict.keys())] # Initial display without mask result_image = composite_image_with_mask(frame_0, masks_dict.get(0, None), False) # Enable download button only if masks are available has_masks = len(masks_dict) > 0 return ( result_image, gr.update(minimum=0, maximum=max_frame, value=0), frames_dict, masks_dict, magic_code, gr.update(interactive=has_masks) ) def handle_image_click(segment_id, frame_number, magic_code, frames_state, masks_state, evt: gr.SelectData): """ Handle click on ImageEditor: send coordinates to SAM-3, get new mask, update display. """ if not segment_id or frames_state is None or evt is None: return None, masks_state, gr.update() # Extract click coordinates x, y = evt.index[0], evt.index[1] frame_idx = int(frame_number) logger.info(f"Click detected at ({x}, {y}) on frame {frame_idx}") if frame_idx not in frames_state: logger.error(f"Frame {frame_idx} not in state") return None, masks_state, gr.update() try: # Get the original frame frame = frames_state[frame_idx] # Convert frame to bytes img_byte_arr = io.BytesIO() frame.save(img_byte_arr, format='PNG') img_bytes = img_byte_arr.getvalue() # Call Modal SAM-3 segmentation function logger.info(f"Calling SAM-3 segmentation with coordinates ({x}, {y})") try: sam_function = modal.Function.from_name("Content-Moderation", "sam3_segmentation_function") mask_bytes = sam_function.remote(image_bytes=img_bytes, x=x, y=y) # Parse returned mask new_mask = Image.open(io.BytesIO(mask_bytes)) # Update masks dict masks_state[frame_idx] = new_mask.copy() # Composite and return updated image (always show mask after click) result_image = composite_image_with_mask(frame, new_mask, True) logger.info(f"Successfully updated mask for frame {frame_idx}") # Enable download button since we now have at least one mask return result_image, masks_state, gr.update(interactive=True) except Exception as e: logger.error(f"Error calling SAM-3 function: {e}") # Return current view without update frame = frames_state[frame_idx] mask = masks_state.get(frame_idx, None) result_image = composite_image_with_mask(frame, mask, True) return result_image, masks_state, gr.update() except Exception as e: logger.error(f"Error handling image click: {e}") return None, masks_state, gr.update() def download_segment(segment_id, frames_state, masks_state): """ Package and download only the alpha masks for the selected segment as a ZIP file. ZIP filename will be {segment_id}.zip. Returns: Tuple of (status_message, file_path, status_visibility) """ if not segment_id: return gr.update(value="No segment selected", visible=True), None, gr.update(visible=True) if not masks_state: logger.warning(f"No alpha masks available for segment: {segment_id}") return gr.update(value="No alpha masks available", visible=True), None, gr.update(visible=True) logger.info(f"Download requested for segment: {segment_id}") try: import shutil # Create temporary directory for alpha mask files only with tempfile.TemporaryDirectory() as tmpdir: # Save only alpha masks for frame_idx, mask_img in masks_state.items(): mask_path = os.path.join(tmpdir, f"alpha_frame_{frame_idx:06d}.png") mask_img.save(mask_path) # Create ZIP with segment UUID as filename zip_path = f"/tmp/{segment_id}.zip" shutil.make_archive(zip_path.replace('.zip', ''), 'zip', tmpdir) logger.info(f"Created ZIP at {zip_path} with {len(masks_state)} alpha masks") return ( gr.update(value=f"✓ Downloaded {len(masks_state)} alpha masks", visible=True), zip_path, gr.update(visible=True) ) except Exception as e: logger.error(f"Error creating download package: {e}") return ( gr.update(value=f"Error: {str(e)}", visible=True), None, gr.update(visible=True) ) # Create a professional Gradio interface using the Golden ratio (1.618) for proportions # Define custom CSS for a professional look css = """ :root { --main-bg-color: #111827; --primary-color: #3B82F6; --secondary-color: #60A5FA; --text-color: #F9FAFB; --text-secondary: #9CA3AF; --card-bg: #1F2937; --border-color: #374151; --accent-blue: #3B82F6; --accent-yellow: #FBBF24; --accent-red: #EF4444; --accent-green: #22C55E; --border-radius: 8px; --golden-ratio: 1.618; --font-header: 'Barlow', sans-serif; --font-body: 'Work Sans', sans-serif; } body { font-family: var(--font-body); background-color: var(--main-bg-color); color: var(--text-color); } .container { max-width: 100%; margin: 0 auto; padding: calc(20px * var(--golden-ratio)); background-color: var(--main-bg-color); border-radius: calc(var(--border-radius) * var(--golden-ratio)); box-shadow: 0 10px 30px rgba(0, 0, 0, 0.3); } .logo-container { display: flex; justify-content: center; margin-bottom: calc(20px * var(--golden-ratio)); padding: 15px; background-color: var(--card-bg); border-radius: var(--border-radius); border: 1px solid var(--border-color); box-shadow: 0 4px 6px rgba(0, 0, 0, 0.2); } .logo { max-width: 300px; max-height: 100px; transition: transform 0.3s ease; display: block; /* Ensure it's a block element */ margin: 0 auto; /* This will center a block element within its flex container */ } .logo:hover { transform: scale(1.05); } .header { text-align: center; margin-bottom: calc(30px * var(--golden-ratio)); padding: calc(15px * var(--golden-ratio)); background: linear-gradient(135deg, var(--primary-color) 0%, var(--secondary-color) 100%); color: white; border-radius: var(--border-radius); box-shadow: 0 4px 10px rgba(59, 130, 246, 0.3); } .header h1 { color: white; font-family: var(--font-header); font-size: calc(1.5rem * var(--golden-ratio)); margin-bottom: calc(0.5rem * var(--golden-ratio)); text-shadow: 0 2px 4px rgba(0, 0, 0, 0.3); font-weight: 600; } .header p { color: rgba(255, 255, 255, 0.9); font-size: 1rem; max-width: calc(600px * var(--golden-ratio)); margin: 0 auto; } .input-section, .output-section { background-color: var(--card-bg); border: 1px solid var(--border-color); border-radius: var(--border-radius); padding: calc(20px * var(--golden-ratio)); box-shadow: 0 4px 6px rgba(0, 0, 0, 0.2); margin-bottom: 20px; transition: all 0.3s ease; } .input-section:hover, .output-section:hover { box-shadow: 0 10px 20px rgba(0, 0, 0, 0.3); border-color: var(--primary-color); } .input-section { flex: var(--golden-ratio); } .output-section { flex: 1; } .footer { text-align: center; margin-top: calc(30px * var(--golden-ratio)); padding: 15px; color: var(--text-secondary); font-size: 0.9rem; border-top: 1px solid var(--border-color); } /* Improve form elements */ .gradio-slider input[type=range] { accent-color: var(--primary-color); } .gradio-textbox input, .gradio-textbox textarea { background-color: var(--main-bg-color) !important; border: 1px solid var(--border-color) !important; border-radius: var(--border-radius) !important; padding: 10px !important; color: var(--text-color) !important; transition: all 0.3s ease !important; } .gradio-textbox input:focus, .gradio-textbox textarea:focus { border-color: var(--primary-color) !important; box-shadow: 0 0 0 2px rgba(59, 130, 246, 0.3) !important; } .gradio-button { background-color: var(--primary-color) !important; color: white !important; border-radius: var(--border-radius) !important; padding: calc(10px * var(--golden-ratio)) calc(20px * var(--golden-ratio)) !important; font-weight: 600 !important; font-family: var(--font-header) !important; transition: all 0.3s ease !important; box-shadow: 0 4px 6px rgba(59, 130, 246, 0.3) !important; border: none !important; } .gradio-button:hover { background-color: var(--secondary-color) !important; transform: translateY(-2px); box-shadow: 0 6px 12px rgba(59, 130, 246, 0.4) !important; } /* Golden ratio spacing for elements */ .gradio-row { margin-bottom: calc(16px * var(--golden-ratio)) !important; } /* Additional dark theme adjustments */ .gradio-container { background-color: var(--main-bg-color) !important; } .gradio-form { background-color: var(--card-bg) !important; border: 1px solid var(--border-color) !important; } /* Labels and text styling */ label { color: var(--text-color) !important; font-family: var(--font-body) !important; } /* Responsive adjustments */ @media (max-width: 768px) { .container { padding: 15px; } .input-section, .output-section { padding: 15px; } } /* Loading modal overlay */ #download-loading-modal { display: none; position: fixed; top: 0; left: 0; width: 100%; height: 100%; background-color: rgba(0, 0, 0, 0.7); z-index: 9999; justify-content: center; align-items: center; } #download-loading-modal.show { display: flex; } .loading-content { background-color: var(--card-bg); padding: 40px; border-radius: var(--border-radius); text-align: center; box-shadow: 0 10px 40px rgba(0, 0, 0, 0.5); } .spinner { border: 4px solid var(--border-color); border-top: 4px solid var(--primary-color); border-radius: 50%; width: 50px; height: 50px; animation: spin 1s linear infinite; margin: 0 auto 20px; } @keyframes spin { 0% { transform: rotate(0deg); } 100% { transform: rotate(360deg); } } .loading-text { color: var(--text-color); font-size: 18px; font-weight: 500; } """ # Create a Blocks interface for more customization with gr.Blocks(css=css, theme=gr.themes.Soft(primary_hue="indigo", secondary_hue="purple")) as demo: with gr.Column(elem_classes="container"): # Header section with gr.Column(elem_classes="header"): gr.HTML("""

BSOD.tv - Moderation Workbench

Professional content moderation and segmentation workflow.
Submit jobs and manage content segmentation efficiently.

""") # Main content with specified layout with gr.Tabs(): with gr.Tab("Submit Job"): # First Row: Left Department Notes (Textbox), Right Video input with gr.Row(elem_classes="input-section"): with gr.Column(scale=1): cm_notes = gr.Textbox(label="Department Notes", lines=6, placeholder="Enter notes for the moderation team...") with gr.Column(scale=1): cm_video_in = gr.Video(label="Video Input", sources=["upload"], interactive=True) # Second Row: Email and Company Name (2/3 and 1/3 columns) with gr.Row(elem_classes="input-section"): with gr.Column(scale=2): cm_email = gr.Textbox(label="Email") with gr.Column(scale=1): cm_company_name = gr.Textbox(label="Company Name") # Third Row: Single Video Output with gr.Row(elem_classes="output-section"): cm_video_out = gr.Textbox(label="Output") # Final Row: Process button with gr.Row(): cm_process_btn = gr.Button("Process", variant="primary") with gr.Tab("Segmentation Editing"): gr.Markdown("### Segmentation Editing Workspace") # State management for session data magic_code_state = gr.State(value=None) frames_state = gr.State(value=None) masks_state = gr.State(value=None) # Row 1: Magic Code textbox + Submit button with gr.Row(elem_classes="input-section"): with gr.Column(scale=3): seg_magic_code = gr.Textbox( label="Magic Code", placeholder="Enter your magic code...", interactive=True ) with gr.Column(scale=1): seg_submit_btn = gr.Button("Submit Code", variant="primary") # Row 2: Segment ID dropdown with gr.Row(elem_classes="input-section", visible=False) as seg_row2: seg_id_dropdown = gr.Dropdown( label="Segment ID", choices=[], interactive=True, info="Select a segment to edit" ) # Row 3: Show Alpha Mask checkbox with gr.Row(elem_classes="input-section", visible=False) as seg_row3_checkbox: seg_show_mask = gr.Checkbox( label="Show Alpha Mask", value=False, interactive=True ) # Row 4: ImageEditor component (SAM-3 style interaction) with gr.Row(elem_classes="input-section", visible=False) as seg_row4: seg_image_editor = gr.ImageEditor( label="Image Click Segmentation", type="pil", interactive=True, brush=gr.Brush(colors=["#FF0000"], color_mode="fixed") ) # Row 5: Frame number slider with gr.Row(elem_classes="input-section", visible=False) as seg_row5: seg_frame_slider = gr.Slider( minimum=0, maximum=100, value=0, step=1, label="Frame Number", interactive=True, info="Select video frame to segment" ) # Row 6: Download Segment button with gr.Row(visible=False) as seg_row6: seg_download_btn = gr.Button("Download Segment", variant="secondary") seg_download_status = gr.Textbox(label="Status", value="", visible=False, interactive=False) seg_download_file = gr.File(label="Download", visible=False) # Hidden component for keyboard event capture seg_keyboard_input = gr.Textbox(visible=False, elem_id="seg_keyboard_input") # Loading modal HTML gr.HTML("""
Preparing download…
""") # Wire Content Moderation processing cm_process_btn.click( fn=process_video, inputs=[cm_video_in, cm_notes, cm_email, cm_company_name], outputs=cm_video_out ) # Wire Segmentation Editing callbacks seg_submit_btn.click( fn=submit_magic_code, inputs=[seg_magic_code], outputs=[seg_id_dropdown, seg_magic_code, seg_row2, seg_row3_checkbox, seg_row4, seg_row5, seg_row6] ) # Segment selection handler seg_id_dropdown.change( fn=handle_segment_selection, inputs=[seg_id_dropdown, seg_magic_code], outputs=[seg_image_editor, seg_frame_slider, frames_state, masks_state, magic_code_state, seg_download_btn] ) # Frame slider handler seg_frame_slider.change( fn=load_segment_frame, inputs=[seg_id_dropdown, seg_frame_slider, seg_show_mask, magic_code_state, frames_state, masks_state], outputs=[seg_image_editor, seg_show_mask] ) # Show mask checkbox handler seg_show_mask.change( fn=load_segment_frame, inputs=[seg_id_dropdown, seg_frame_slider, seg_show_mask, magic_code_state, frames_state, masks_state], outputs=[seg_image_editor, seg_show_mask] ) # Image click handler (for SAM-3 segmentation) seg_image_editor.select( fn=handle_image_click, inputs=[seg_id_dropdown, seg_frame_slider, magic_code_state, frames_state, masks_state], outputs=[seg_image_editor, masks_state, seg_download_btn] ) # Download button handler seg_download_btn.click( fn=download_segment, inputs=[seg_id_dropdown, frames_state, masks_state], outputs=[seg_download_status, seg_download_file, seg_download_status] ) # Keyboard navigation handler seg_keyboard_input.change( fn=handle_keyboard_navigation, inputs=[ seg_keyboard_input, seg_id_dropdown, seg_frame_slider, seg_show_mask, magic_code_state, frames_state, masks_state ], outputs=[seg_image_editor, seg_frame_slider] ) # Add JavaScript to capture arrow key events demo.load( None, None, None, js=""" () => { // Wait for the DOM to be ready setTimeout(() => { const keyboardInput = document.getElementById('seg_keyboard_input'); if (!keyboardInput) { console.warn('Keyboard input element not found'); return; } // Add keydown listener to document document.addEventListener('keydown', (e) => { // Only handle arrow keys if (e.key === 'ArrowLeft' || e.key === 'ArrowRight') { // Check if we're in the Segmentation Editing tab const segTab = document.querySelector('[id*="segmentation-editing"]'); const activeTab = document.querySelector('.tab-nav button.selected'); if (activeTab && activeTab.textContent.includes('Segmentation Editing')) { e.preventDefault(); // Update the hidden input to trigger the change event const textarea = keyboardInput.querySelector('textarea'); if (textarea) { textarea.value = e.key; textarea.dispatchEvent(new Event('input', { bubbles: true })); } } } }); }, 1000); } """ ) if __name__ == "__main__": # To run this file locally, you'll need to install gradio and requests: # pip install gradio requests demo.launch()