## ENVIRONMENT VARIABLES
# MODAL_VOLUME
# MODAL_TOKEN_ID
# MODAL_ENVIRONMENT
# MODAL_TOKEN_SECRET
import os
import re
import cv2
import io
import time
import base64
import modal
import logging
import tempfile
import numpy as np
import gradio as gr
from PIL import Image
from typing import Dict, Optional, Tuple
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def process_video(video_path, notes, email, company_name) -> str:
"""
Process the input video for content moderation using Modal.
Steps:
1. Upload the provided video to the configured Modal Volume.
2. Obtain the video dimensions (width, height).
3. Call the Content-Moderation reception_function via Modal (synchronously with .remote).
4. Download the processed video returned by the function to /tmp with a random UUID filename.
5. Return the local path to the downloaded video.
"""
# Validate inputs
if not video_path or not os.path.exists(video_path):
logger.error("Invalid video path provided to process_video.")
return "Invalid video path."
# Helper to obtain width and height
def _get_video_dimensions(path: str):
width, height = 1920, 1080 # Default values
try:
# type: ignore
cap = cv2.VideoCapture(path)
if cap.isOpened():
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
except Exception as e:
logger.debug(f"OpenCV not available or failed to read video dimensions: {e}")
return width, height
try:
# 1. Setup Modal app and volume
_ = os.environ.get('MODAL_TOKEN_ID') # Read to ensure environment readiness (kept for parity with process_audio)
_ = os.environ.get('MODAL_TOKEN_SECRET')
_ = os.environ.get('MODAL_ENVIRONMENT')
modal_volume_name = os.environ['MODERATION_MODAL_VOLUME']
# Unique processing folder and paths
processing_id = str(int(time.time()))
ext = os.path.splitext(video_path)[1]
remote_input_path = f"/{processing_id}/input_video{ext}"
# 2. Upload video to Modal Volume
volume = modal.Volume.from_name(modal_volume_name)
try:
with volume.batch_upload() as batch:
batch.put_file(video_path, remote_input_path)
except Exception as e:
logger.error(f"Error uploading video to Modal Storage: {e}")
return "Error uploading video to Cloud Storage."
# 3. Obtain video dimensions
width, height = _get_video_dimensions(video_path)
# 4. Call Modal function synchronously
try:
moderation_function = modal.Function.from_name("Content-Moderation", "professional_reception_function")
moderation_function.spawn(
input_text=str(notes) if notes is not None else "",
video_path=remote_input_path,
size=(int(width), int(height)),
email=email,
company_name=company_name
)
except Exception as e:
logger.error(f"Error calling Modal reception_function: {e}")
return "Error calling Outpost to trigger processing."
return "Video Request Obtained"
except Exception as e:
logger.error(f"Unexpected error in process_video: {e}")
return "Unexpected error during video processing."
# UUID regex pattern for filtering segment sub-directories
UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$')
def submit_magic_code(magic_code):
"""
Validate magic code and retrieve segment list from Modal volume.
Decodes the Base64 magic code to obtain the epoch-based folder name,
mounts the moderation_volume, checks folder existence, and lists
UUID-pattern sub-directories.
Returns:
Tuple of (dropdown_update, status_message, row2_visibility, row3_visibility, row4_visibility, row5_visibility)
"""
if not magic_code:
return (
gr.update(choices=[], label="Segment IDs"),
"Please enter a magic code",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False)
)
try:
# Decode Base64 magic code to get folder name
try:
decoded_bytes = base64.b64decode(magic_code, validate=True)
folder_name = decoded_bytes.decode('utf-8').strip()
logger.info(f"Magic code decoded to folder: {folder_name}")
except Exception as e:
logger.error(f"Failed to decode magic code: {e}")
return (
gr.update(choices=[], label="Invalid magic code"),
"Invalid magic code format – please verify your code",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False)
)
# Get volume name from environment
try:
modal_volume_name = os.environ['MODERATION_MODAL_VOLUME']
except KeyError:
logger.error("MODERATION_MODAL_VOLUME environment variable not set")
return (
gr.update(choices=[], label="Configuration error"),
"Server configuration error – contact support",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False)
)
# Mount volume and check folder existence
try:
volume = modal.Volume.from_name(modal_volume_name)
# List contents at the folder path
folder_path = f"/{folder_name}"
try:
# Use listdir to check if folder exists and list contents
entries = volume.listdir(folder_path)
except Exception as list_error:
logger.error(f"Folder not found or inaccessible: {folder_path} - {list_error}")
return (
gr.update(choices=[], label="Folder not found"),
"Folder not found – please verify your magic code",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False)
)
# Filter for UUID-pattern sub-directories
uuid_segments = []
for entry in entries:
# Extract just the name (last component of path)
entry_name = entry.path.rstrip('/').split('/')[-1]
# Check if it's a directory and matches UUID pattern
if entry.type == modal.volume.FileEntryType.DIRECTORY and UUID_PATTERN.match(entry_name):
uuid_segments.append(entry_name)
uuid_segments.sort() # Sort alphabetically for consistent display
if not uuid_segments:
logger.warning(f"No UUID segments found in folder: {folder_name}")
return (
gr.update(choices=[], label="No segments found"),
"No segment IDs found in this folder",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False)
)
logger.info(f"Found {len(uuid_segments)} UUID segments in folder {folder_name}")
return (
gr.update(choices=uuid_segments, value=uuid_segments[0], label="Segment IDs"),
f"Loaded {len(uuid_segments)} segment(s)",
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True)
)
except Exception as e:
logger.error(f"Error accessing Modal volume: {e}")
return (
gr.update(choices=[], label="Volume access error"),
f"Error accessing storage: {str(e)}",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False)
)
except Exception as e:
logger.error(f"Unexpected error in submit_magic_code: {e}")
return (
gr.update(choices=[], label="Error"),
"Unexpected error – please try again",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False)
)
def download_segment_files(magic_code: str, segment_id: str) -> Tuple[Dict[int, Image.Image], Dict[int, Image.Image], int]:
"""
Download all frame images and alpha masks for a given segment from Modal volume.
Returns:
frames_dict: Dict mapping frame_index -> PIL.Image (original frames)
masks_dict: Dict mapping frame_index -> PIL.Image (alpha masks)
max_frame: Maximum frame index found
"""
try:
modal_volume_name = os.environ['MODERATION_MODAL_VOLUME']
volume = modal.Volume.from_name(modal_volume_name)
segment_path = f"/{magic_code}/{segment_id}"
frames_dict = {}
masks_dict = {}
logger.info(f"Downloading files from {segment_path}")
# List all files in the segment directory
try:
files = list(volume.listdir(segment_path))
except Exception as e:
logger.error(f"Failed to list segment directory: {e}")
return {}, {}, 0
# Parse filenames and download
frame_pattern = re.compile(r'^frame_(\d+)\.(jpg|png)$')
alpha_pattern = re.compile(r'^alpha_frame_(\d+)\.png$')
for entry in files:
if entry.type != modal.volume.FileEntryType.FILE:
continue
filename = os.path.basename(entry.path)
# Check if it's a frame file
frame_match = frame_pattern.match(filename)
if frame_match:
frame_idx = int(frame_match.group(1))
try:
# Download frame
file_data = volume.read_file(f"{segment_path}/{filename}")
img = Image.open(io.BytesIO(file_data))
frames_dict[frame_idx] = img.copy()
logger.debug(f"Downloaded frame {frame_idx}")
except Exception as e:
logger.error(f"Failed to download {filename}: {e}")
continue
# Check if it's an alpha mask file
alpha_match = alpha_pattern.match(filename)
if alpha_match:
frame_idx = int(alpha_match.group(1))
try:
# Download alpha mask
file_data = volume.read_file(f"{segment_path}/{filename}")
img = Image.open(io.BytesIO(file_data))
masks_dict[frame_idx] = img.copy()
logger.debug(f"Downloaded alpha mask {frame_idx}")
except Exception as e:
logger.error(f"Failed to download {filename}: {e}")
continue
max_frame = max(frames_dict.keys()) if frames_dict else 0
logger.info(f"Downloaded {len(frames_dict)} frames and {len(masks_dict)} masks. Max frame: {max_frame}")
return frames_dict, masks_dict, max_frame
except Exception as e:
logger.error(f"Error downloading segment files: {e}")
return {}, {}, 0
def composite_image_with_mask(frame: Image.Image, mask: Optional[Image.Image], show_mask: bool) -> Image.Image:
"""
Composite the original frame with the alpha mask overlay.
Args:
frame: Original RGB/RGBA image
mask: Alpha mask (grayscale or RGBA)
show_mask: Whether to show the mask overlay
Returns:
Composited PIL Image
"""
if not show_mask or mask is None:
return frame.copy()
# Convert frame to RGBA if needed
if frame.mode != 'RGBA':
frame_rgba = frame.convert('RGBA')
else:
frame_rgba = frame.copy()
# Convert mask to 'L' (grayscale) if needed
if mask.mode != 'L':
mask_gray = mask.convert('L')
else:
mask_gray = mask.copy()
# Resize mask to match frame if needed
if mask_gray.size != frame_rgba.size:
mask_gray = mask_gray.resize(frame_rgba.size, Image.Resampling.LANCZOS)
# Create a colored overlay (semi-transparent red)
overlay = Image.new('RGBA', frame_rgba.size, (255, 0, 0, 128))
# Use mask as alpha channel for the overlay
overlay.putalpha(mask_gray)
# Composite
result = Image.alpha_composite(frame_rgba, overlay)
return result
def load_segment_frame(segment_id, frame_number, show_mask, magic_code_state, frames_state, masks_state):
"""
Load and display a specific frame with optional alpha mask overlay.
"""
if not segment_id or frames_state is None:
return None, gr.update()
frames_dict = frames_state
masks_dict = masks_state
frame_idx = int(frame_number)
if frame_idx not in frames_dict:
logger.warning(f"Frame {frame_idx} not found in downloaded frames")
return None, gr.update()
frame = frames_dict[frame_idx]
mask = masks_dict.get(frame_idx, None)
# Composite image with mask
result_image = composite_image_with_mask(frame, mask, show_mask)
logger.info(f"Loaded frame {frame_idx} with mask overlay: {show_mask}")
return result_image, gr.update()
def handle_keyboard_navigation(key_code, segment_id, current_frame, show_mask, magic_code_state, frames_state, masks_state):
"""
Handle left/right arrow key navigation for frame slider.
Args:
key_code: JavaScript key code ('ArrowLeft' or 'ArrowRight')
segment_id: Current segment ID
current_frame: Current frame number
show_mask: Whether to show alpha mask overlay
magic_code_state: Magic code state
frames_state: Frames dictionary state
masks_state: Masks dictionary state
Returns:
Tuple of (updated image, updated slider value)
"""
if not segment_id or frames_state is None:
return None, gr.update()
frames_dict = frames_state
masks_dict = masks_state
# Get min/max from available frames
available_frames = sorted(frames_dict.keys())
if not available_frames:
return None, gr.update()
min_frame = available_frames[0]
max_frame = available_frames[-1]
# Calculate new frame number
new_frame = int(current_frame)
if key_code == 'ArrowLeft':
new_frame = max(min_frame, new_frame - 1)
elif key_code == 'ArrowRight':
new_frame = min(max_frame, new_frame + 1)
else:
# Unknown key, no change
return None, gr.update()
# If frame didn't change (at boundary), return early
if new_frame == int(current_frame):
return None, gr.update()
logger.info(f"Keyboard navigation: {key_code} -> frame {new_frame}")
# Load the new frame using existing logic
if new_frame not in frames_dict:
logger.warning(f"Frame {new_frame} not found in downloaded frames")
return None, gr.update()
frame = frames_dict[new_frame]
mask = masks_dict.get(new_frame, None)
# Composite image with mask
result_image = composite_image_with_mask(frame, mask, show_mask)
# Return updated image and new slider value
return result_image, gr.update(value=new_frame)
def handle_segment_selection(segment_id, magic_code):
"""
Handle segment selection: download all files and initialize the view.
"""
if not segment_id or not magic_code:
return None, gr.update(maximum=0, value=0), {}, {}, magic_code, gr.update(interactive=False)
logger.info(f"Segment selected: {segment_id}")
# Download all frames and masks
frames_dict, masks_dict, max_frame = download_segment_files(magic_code, segment_id)
if not frames_dict:
logger.error("No frames downloaded")
return None, gr.update(maximum=0, value=0), {}, {}, magic_code, gr.update(interactive=False)
# Load first frame
frame_0 = frames_dict.get(0, None)
if frame_0 is None:
# Use first available frame
frame_0 = frames_dict[min(frames_dict.keys())]
# Initial display without mask
result_image = composite_image_with_mask(frame_0, masks_dict.get(0, None), False)
# Enable download button only if masks are available
has_masks = len(masks_dict) > 0
return (
result_image,
gr.update(minimum=0, maximum=max_frame, value=0),
frames_dict,
masks_dict,
magic_code,
gr.update(interactive=has_masks)
)
def handle_image_click(segment_id, frame_number, magic_code, frames_state, masks_state, evt: gr.SelectData):
"""
Handle click on ImageEditor: send coordinates to SAM-3, get new mask, update display.
"""
if not segment_id or frames_state is None or evt is None:
return None, masks_state, gr.update()
# Extract click coordinates
x, y = evt.index[0], evt.index[1]
frame_idx = int(frame_number)
logger.info(f"Click detected at ({x}, {y}) on frame {frame_idx}")
if frame_idx not in frames_state:
logger.error(f"Frame {frame_idx} not in state")
return None, masks_state, gr.update()
try:
# Get the original frame
frame = frames_state[frame_idx]
# Convert frame to bytes
img_byte_arr = io.BytesIO()
frame.save(img_byte_arr, format='PNG')
img_bytes = img_byte_arr.getvalue()
# Call Modal SAM-3 segmentation function
logger.info(f"Calling SAM-3 segmentation with coordinates ({x}, {y})")
try:
sam_function = modal.Function.from_name("Content-Moderation", "sam3_segmentation_function")
mask_bytes = sam_function.remote(image_bytes=img_bytes, x=x, y=y)
# Parse returned mask
new_mask = Image.open(io.BytesIO(mask_bytes))
# Update masks dict
masks_state[frame_idx] = new_mask.copy()
# Composite and return updated image (always show mask after click)
result_image = composite_image_with_mask(frame, new_mask, True)
logger.info(f"Successfully updated mask for frame {frame_idx}")
# Enable download button since we now have at least one mask
return result_image, masks_state, gr.update(interactive=True)
except Exception as e:
logger.error(f"Error calling SAM-3 function: {e}")
# Return current view without update
frame = frames_state[frame_idx]
mask = masks_state.get(frame_idx, None)
result_image = composite_image_with_mask(frame, mask, True)
return result_image, masks_state, gr.update()
except Exception as e:
logger.error(f"Error handling image click: {e}")
return None, masks_state, gr.update()
def download_segment(segment_id, frames_state, masks_state):
"""
Package and download only the alpha masks for the selected segment as a ZIP file.
ZIP filename will be {segment_id}.zip.
Returns:
Tuple of (status_message, file_path, status_visibility)
"""
if not segment_id:
return gr.update(value="No segment selected", visible=True), None, gr.update(visible=True)
if not masks_state:
logger.warning(f"No alpha masks available for segment: {segment_id}")
return gr.update(value="No alpha masks available", visible=True), None, gr.update(visible=True)
logger.info(f"Download requested for segment: {segment_id}")
try:
import shutil
# Create temporary directory for alpha mask files only
with tempfile.TemporaryDirectory() as tmpdir:
# Save only alpha masks
for frame_idx, mask_img in masks_state.items():
mask_path = os.path.join(tmpdir, f"alpha_frame_{frame_idx:06d}.png")
mask_img.save(mask_path)
# Create ZIP with segment UUID as filename
zip_path = f"/tmp/{segment_id}.zip"
shutil.make_archive(zip_path.replace('.zip', ''), 'zip', tmpdir)
logger.info(f"Created ZIP at {zip_path} with {len(masks_state)} alpha masks")
return (
gr.update(value=f"✓ Downloaded {len(masks_state)} alpha masks", visible=True),
zip_path,
gr.update(visible=True)
)
except Exception as e:
logger.error(f"Error creating download package: {e}")
return (
gr.update(value=f"Error: {str(e)}", visible=True),
None,
gr.update(visible=True)
)
# Create a professional Gradio interface using the Golden ratio (1.618) for proportions
# Define custom CSS for a professional look
css = """
:root {
--main-bg-color: #111827;
--primary-color: #3B82F6;
--secondary-color: #60A5FA;
--text-color: #F9FAFB;
--text-secondary: #9CA3AF;
--card-bg: #1F2937;
--border-color: #374151;
--accent-blue: #3B82F6;
--accent-yellow: #FBBF24;
--accent-red: #EF4444;
--accent-green: #22C55E;
--border-radius: 8px;
--golden-ratio: 1.618;
--font-header: 'Barlow', sans-serif;
--font-body: 'Work Sans', sans-serif;
}
body {
font-family: var(--font-body);
background-color: var(--main-bg-color);
color: var(--text-color);
}
.container {
max-width: 100%;
margin: 0 auto;
padding: calc(20px * var(--golden-ratio));
background-color: var(--main-bg-color);
border-radius: calc(var(--border-radius) * var(--golden-ratio));
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.3);
}
.logo-container {
display: flex;
justify-content: center;
margin-bottom: calc(20px * var(--golden-ratio));
padding: 15px;
background-color: var(--card-bg);
border-radius: var(--border-radius);
border: 1px solid var(--border-color);
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.2);
}
.logo {
max-width: 300px;
max-height: 100px;
transition: transform 0.3s ease;
display: block; /* Ensure it's a block element */
margin: 0 auto; /* This will center a block element within its flex container */
}
.logo:hover {
transform: scale(1.05);
}
.header {
text-align: center;
margin-bottom: calc(30px * var(--golden-ratio));
padding: calc(15px * var(--golden-ratio));
background: linear-gradient(135deg, var(--primary-color) 0%, var(--secondary-color) 100%);
color: white;
border-radius: var(--border-radius);
box-shadow: 0 4px 10px rgba(59, 130, 246, 0.3);
}
.header h1 {
color: white;
font-family: var(--font-header);
font-size: calc(1.5rem * var(--golden-ratio));
margin-bottom: calc(0.5rem * var(--golden-ratio));
text-shadow: 0 2px 4px rgba(0, 0, 0, 0.3);
font-weight: 600;
}
.header p {
color: rgba(255, 255, 255, 0.9);
font-size: 1rem;
max-width: calc(600px * var(--golden-ratio));
margin: 0 auto;
}
.input-section, .output-section {
background-color: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: var(--border-radius);
padding: calc(20px * var(--golden-ratio));
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.2);
margin-bottom: 20px;
transition: all 0.3s ease;
}
.input-section:hover, .output-section:hover {
box-shadow: 0 10px 20px rgba(0, 0, 0, 0.3);
border-color: var(--primary-color);
}
.input-section {
flex: var(--golden-ratio);
}
.output-section {
flex: 1;
}
.footer {
text-align: center;
margin-top: calc(30px * var(--golden-ratio));
padding: 15px;
color: var(--text-secondary);
font-size: 0.9rem;
border-top: 1px solid var(--border-color);
}
/* Improve form elements */
.gradio-slider input[type=range] {
accent-color: var(--primary-color);
}
.gradio-textbox input, .gradio-textbox textarea {
background-color: var(--main-bg-color) !important;
border: 1px solid var(--border-color) !important;
border-radius: var(--border-radius) !important;
padding: 10px !important;
color: var(--text-color) !important;
transition: all 0.3s ease !important;
}
.gradio-textbox input:focus, .gradio-textbox textarea:focus {
border-color: var(--primary-color) !important;
box-shadow: 0 0 0 2px rgba(59, 130, 246, 0.3) !important;
}
.gradio-button {
background-color: var(--primary-color) !important;
color: white !important;
border-radius: var(--border-radius) !important;
padding: calc(10px * var(--golden-ratio)) calc(20px * var(--golden-ratio)) !important;
font-weight: 600 !important;
font-family: var(--font-header) !important;
transition: all 0.3s ease !important;
box-shadow: 0 4px 6px rgba(59, 130, 246, 0.3) !important;
border: none !important;
}
.gradio-button:hover {
background-color: var(--secondary-color) !important;
transform: translateY(-2px);
box-shadow: 0 6px 12px rgba(59, 130, 246, 0.4) !important;
}
/* Golden ratio spacing for elements */
.gradio-row {
margin-bottom: calc(16px * var(--golden-ratio)) !important;
}
/* Additional dark theme adjustments */
.gradio-container {
background-color: var(--main-bg-color) !important;
}
.gradio-form {
background-color: var(--card-bg) !important;
border: 1px solid var(--border-color) !important;
}
/* Labels and text styling */
label {
color: var(--text-color) !important;
font-family: var(--font-body) !important;
}
/* Responsive adjustments */
@media (max-width: 768px) {
.container {
padding: 15px;
}
.input-section, .output-section {
padding: 15px;
}
}
/* Loading modal overlay */
#download-loading-modal {
display: none;
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background-color: rgba(0, 0, 0, 0.7);
z-index: 9999;
justify-content: center;
align-items: center;
}
#download-loading-modal.show {
display: flex;
}
.loading-content {
background-color: var(--card-bg);
padding: 40px;
border-radius: var(--border-radius);
text-align: center;
box-shadow: 0 10px 40px rgba(0, 0, 0, 0.5);
}
.spinner {
border: 4px solid var(--border-color);
border-top: 4px solid var(--primary-color);
border-radius: 50%;
width: 50px;
height: 50px;
animation: spin 1s linear infinite;
margin: 0 auto 20px;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.loading-text {
color: var(--text-color);
font-size: 18px;
font-weight: 500;
}
"""
# Create a Blocks interface for more customization
with gr.Blocks(css=css, theme=gr.themes.Soft(primary_hue="indigo", secondary_hue="purple")) as demo:
with gr.Column(elem_classes="container"):
# Header section
with gr.Column(elem_classes="header"):
gr.HTML("""
Professional content moderation and segmentation workflow.
Submit jobs and manage content segmentation efficiently.