| import gradio as gr |
| import clip |
| import torch |
| from qdrant_client import QdrantClient |
| import subprocess |
| import os |
| import uuid |
| import yt_dlp |
|
|
| |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| model, preprocess = clip.load("ViT-B/32", device=device) |
|
|
| |
| client = QdrantClient( |
| url="https://265484ec-5f64-40ec-a619-c7c9dffc2dd9.us-east-1-0.aws.cloud.qdrant.io:6333", |
| api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.I2MgcVnOKkWmOXwFlqJqEqm6LFQIF4cjxU5up4wxwyw" |
| ) |
| COLLECTION_NAME = "video_segments" |
|
|
| |
| CLIP_OUTPUT_DIR = "generated_clips" |
| os.makedirs(CLIP_OUTPUT_DIR, exist_ok=True) |
|
|
| |
| VIDEO_URLS = { |
| "temp_video_0.mp4": 'https://www.youtube.com/watch?v=9CGGh6ivg68', |
| "temp_video_1.mp4": 'https://www.youtube.com/watch?v=WXoOohWU28Y', |
| "temp_video_2.mp4": 'https://www.youtube.com/watch?v=TV-DjM8242s', |
| "temp_video_3.mp4": 'https://www.youtube.com/watch?v=rCVlIVKqqGE', |
| "temp_video_4.mp4": 'https://www.youtube.com/watch?v=lb_5AdUpfuA', |
| "temp_video_5.mp4": 'https://www.youtube.com/watch?v=FCQ-rih6cHY', |
| "temp_video_6.mp4": 'https://www.youtube.com/watch?v=eQ6UE968Xe4', |
| "temp_video_7.mp4": 'https://www.youtube.com/watch?v=eFgkZKhNUdM' |
| } |
|
|
| DEFAULT_VIDEO_URL = VIDEO_URLS["temp_video_0.mp4"] |
|
|
| def extract_video_clip(video_url, start_time, end_time): |
| """ |
| Use yt-dlp and ffmpeg to extract a clip directly from YouTube. |
| """ |
| clip_name = f"clip_{uuid.uuid4().hex}.mp4" |
| output_path = os.path.join(CLIP_OUTPUT_DIR, clip_name) |
| duration = end_time - start_time |
| |
| print(f"[INFO] Attempting to extract clip from {video_url} ({start_time} - {end_time})") |
| |
| |
| try: |
| |
| ydl_opts = { |
| 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', |
| 'quiet': True |
| } |
| |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| info = ydl.extract_info(video_url, download=False) |
| formats = info.get('formats', [info]) |
| |
| |
| best_url = None |
| for f in formats: |
| if f.get('ext') == 'mp4' and f.get('url'): |
| best_url = f['url'] |
| break |
| |
| if not best_url and info.get('url'): |
| best_url = info['url'] |
| |
| if not best_url: |
| print("[WARN] Could not find a suitable direct URL") |
| raise Exception("No suitable URL found") |
| |
| |
| command = [ |
| "ffmpeg", |
| "-ss", str(start_time), |
| "-i", best_url, |
| "-t", str(duration), |
| "-c:v", "libx264", |
| "-c:a", "aac", |
| "-preset", "ultrafast", |
| output_path, |
| "-y" |
| ] |
| |
| print(f"[INFO] Running ffmpeg command: {' '.join(command)}") |
| result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| |
| if result.returncode != 0: |
| print(f"[WARN] ffmpeg command failed with code {result.returncode}") |
| print(f"[WARN] stderr: {result.stderr.decode('utf-8')}") |
| raise Exception(f"ffmpeg failed with code {result.returncode}") |
| |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| print(f"[INFO] Successfully extracted clip to {output_path}") |
| return output_path |
| else: |
| print(f"[WARN] Output file missing or empty: {output_path}") |
| raise Exception("Output file missing or empty") |
| |
| except Exception as e: |
| print(f"[ERROR] Method 1 failed: {str(e)}") |
| |
| |
| try: |
| print("[INFO] Trying Method 2: Download full video first") |
| temp_video = os.path.join(CLIP_OUTPUT_DIR, f"temp_{uuid.uuid4().hex}.mp4") |
| |
| ydl_opts = { |
| 'format': 'best[ext=mp4]/best', |
| 'outtmpl': temp_video, |
| 'quiet': True |
| } |
| |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| ydl.download([video_url]) |
| |
| if os.path.exists(temp_video) and os.path.getsize(temp_video) > 0: |
| |
| command = [ |
| "ffmpeg", |
| "-ss", str(start_time), |
| "-i", temp_video, |
| "-t", str(duration), |
| "-c:v", "copy", |
| "-c:a", "copy", |
| output_path, |
| "-y" |
| ] |
| |
| print(f"[INFO] Running ffmpeg command: {' '.join(command)}") |
| result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| |
| |
| try: |
| os.remove(temp_video) |
| print(f"[INFO] Removed temporary file {temp_video}") |
| except Exception as cleanup_error: |
| print(f"[WARN] Failed to remove temp file: {cleanup_error}") |
| |
| if result.returncode != 0: |
| print(f"[WARN] ffmpeg command failed with code {result.returncode}") |
| print(f"[WARN] stderr: {result.stderr.decode('utf-8')}") |
| raise Exception(f"ffmpeg failed with code {result.returncode}") |
| |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| print(f"[INFO] Successfully extracted clip to {output_path}") |
| return output_path |
| else: |
| print(f"[WARN] Output file missing or empty: {output_path}") |
| raise Exception("Output file missing or empty") |
| |
| except Exception as e: |
| print(f"[ERROR] Method 2 failed: {str(e)}") |
| |
| |
| print("[ERROR] All extraction methods failed") |
| return None |
|
|
| def time_to_seconds(time_str): |
| h, m, s = time_str.split(':') |
| return int(h) * 3600 + int(m) * 60 + float(s) |
|
|
| def search_and_clip_video(text_query: str): |
| print(f"[INFO] Searching for: {text_query}") |
|
|
| |
| wrapper = "<div style='width:100%; max-width:720px; height:405px; margin:auto;'>{}</div>" |
|
|
| |
| with torch.no_grad(): |
| text_tokens = clip.tokenize([text_query]).to(device) |
| text_features = model.encode_text(text_tokens) |
| text_features /= text_features.norm(dim=1, keepdim=True) |
|
|
| search_result = client.search( |
| collection_name=COLLECTION_NAME, |
| query_vector=text_features.cpu().numpy()[0].tolist(), |
| limit=1, |
| ) |
|
|
| if not search_result: |
| print("[WARN] No result found.") |
| return wrapper.format("<p style='text-align:center; padding-top:180px;'>No matching video found.</p>") |
|
|
| hit = search_result[0] |
| start = hit.payload.get("start", 0) |
| end = hit.payload.get("end", 0) |
| start = time_to_seconds(start) if isinstance(start, str) else float(start) |
| end = time_to_seconds(end) if isinstance(end, str) else float(end) |
| video_filename = hit.payload.get("video_path", "temp_video_0.mp4") |
| video_url = VIDEO_URLS.get(video_filename, DEFAULT_VIDEO_URL) |
|
|
| embed_url = video_url.replace("watch?v=", "embed/") + f"?start={int(start)}&end={int(end)}&autoplay=1" |
|
|
| iframe = f""" |
| <iframe width="100%" height="100%" |
| src="{embed_url}" |
| frameborder="0" |
| allow="autoplay; encrypted-media" |
| allowfullscreen></iframe> |
| """ |
| return wrapper.format(iframe) |
| |
| def get_test_video(): |
| print("[INFO] Returning test YouTube URL") |
| return DEFAULT_VIDEO_URL |
|
|
| |
| search_demo = gr.Interface( |
| fn=search_and_clip_video, |
| inputs=gr.Textbox(label="Enter search query", value="sample query"), |
| |
| outputs=gr.HTML(label="YouTube Clip"), |
| title="🎥 Semantic Video Search with Clip Extraction", |
| description="Returns a clipped video segment matching your query." |
| ) |
|
|
| test_demo = gr.Interface( |
| fn=get_test_video, |
| inputs=None, |
| outputs=gr.Video(label="Test Video"), |
| title="Simple Video Test", |
| description="Always displays the default video to verify video player works." |
| ) |
|
|
| demo = gr.TabbedInterface( |
| [search_demo, test_demo], |
| ["Search Video", "Test Video Player"] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(share=True) |