""" asset_checker.py ───────────────────────────────────────────────────────────── Autonomous Short-Form Video Engine — Visual Quality Gate Uses Nemotron-2 VL (vision-language model) via NVIDIA NIM to score each downloaded media asset for quality & relevance. Assets scoring < 6/10 are rejected and trigger a re-fetch. ───────────────────────────────────────────────────────────── """ import os import json import base64 import logging from pathlib import Path from openai import OpenAI from PIL import Image from io import BytesIO from dotenv import load_dotenv from concurrent.futures import ThreadPoolExecutor load_dotenv() logger = logging.getLogger(__name__) NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "") SCORE_THRESHOLD = 6.0 # assets below this score are rejected MAX_REFETCH_ATTEMPTS = 2 # NVIDIA NIM endpoint for Nemotron-2 VL nvidia_client = OpenAI( base_url="https://integrate.api.nvidia.com/v1", api_key=NVIDIA_API_KEY, ) VL_MODEL = "nvidia/nemotron-4-340b-instruct" # vision-capable endpoint def _extract_frame(media_path: str) -> str: """ Extract a thumbnail from a video file or resize an image, then return as base64-encoded JPEG string. """ path = Path(media_path) if path.suffix.lower() in (".mp4", ".mov", ".avi", ".webm"): try: from moviepy import VideoFileClip clip = VideoFileClip(str(path)) # Grab frame at 20% into the video (avoids black intro frames) t = clip.duration * 0.2 frame = clip.get_frame(t) clip.close() img = Image.fromarray(frame) except Exception as e: logger.warning(f"[Checker] Could not extract video frame: {e}") return "" else: try: img = Image.open(path) except Exception as e: logger.warning(f"[Checker] Could not open image: {e}") return "" # Resize to max 512px on longest side for API efficiency img.thumbnail((512, 512), Image.LANCZOS) if img.mode != "RGB": img = img.convert("RGB") buf = BytesIO() img.save(buf, format="JPEG", quality=85) return base64.b64encode(buf.getvalue()).decode("utf-8") def _build_check_prompt(topic: str) -> str: return ( f"You are a strict video quality reviewer. Look at this image (a frame from a short-form video clip). " f"Rate it on two criteria for the topic: '{topic}'.\n\n" f"1. Visual Quality (lighting, sharpness, professional look): 1-10\n" f"2. Topic Relevance (does it visually match '{topic}'?): 1-10\n\n" f"Reply ONLY with valid JSON in this format:\n" f'{{ "quality_score": 7, "relevance_score": 8, "overall": 7.5, "reject": false, "reason": "brief reason" }}' ) def check_asset(media_path: str, topic: str, skip_check: bool = False) -> dict: """ Run Nemotron-2 VL quality check on a downloaded media asset. Args: media_path: Path to the downloaded video/image file topic: The scene topic/keyword for relevance scoring skip_check: If True, skip the VL check and approve automatically Returns: Dict: {"approved": bool, "overall": float, "reason": str} """ if skip_check or not NVIDIA_API_KEY: if not NVIDIA_API_KEY: logger.warning("[Checker] NVIDIA_API_KEY not set — auto-approving all assets.") return {"approved": True, "overall": 10.0, "reason": "Check skipped"} frame_b64 = _extract_frame(media_path) if not frame_b64: return {"approved": True, "overall": 7.0, "reason": "Could not extract frame — auto-approved"} prompt = _build_check_prompt(topic) image_url = f"data:image/jpeg;base64,{frame_b64}" try: response = nvidia_client.chat.completions.create( model=VL_MODEL, messages=[ { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": image_url}}, {"type": "text", "text": prompt}, ], } ], temperature=0.1, max_tokens=200, ) raw = response.choices[0].message.content.strip() # Strip markdown if needed if raw.startswith("```"): raw = raw.split("```")[1] if raw.startswith("json"): raw = raw[4:] raw = raw.strip() result = json.loads(raw) overall = float(result.get("overall", 7.0)) approved = overall >= SCORE_THRESHOLD logger.info( f"[Checker] {Path(media_path).name} → score: {overall:.1f}/10 " f"({'✅ approved' if approved else '❌ rejected'})" ) return { "approved": approved, "overall": overall, "reason": result.get("reason", ""), } except json.JSONDecodeError as e: logger.warning(f"[Checker] JSON parse error from VL response: {e} — auto-approving") return {"approved": True, "overall": 7.0, "reason": "Parse error — auto-approved"} except Exception as e: logger.warning(f"[Checker] VL API error: {e} — auto-approving") return {"approved": True, "overall": 7.0, "reason": f"API error: {str(e)[:60]}"} def check_all_assets(media_results: list[dict], video_json: dict, skip_check: bool = False) -> list[dict]: """ Quality-check all fetched media assets in parallel. """ if skip_check or not NVIDIA_API_KEY: logger.info("[Checker] Skipping parallel checks (skip_check=True or no API key)") for item in media_results: if item.get("path"): item.update({"approved": True, "score": 10.0, "check_reason": "Skipped"}) else: item.update({"approved": False, "score": 0.0, "check_reason": "No file"}) return media_results scene_map = {s["scene_number"]: s for s in video_json["scenes"]} def _check_task(item): if not item.get("path"): item.update({"approved": False, "score": 0.0, "check_reason": "No file"}) return item scene = scene_map.get(item["scene_number"], {}) topic = scene.get("visual_description", "") or ", ".join( scene.get("pexels_keywords", ["video"]) ) result = check_asset(item["path"], topic, skip_check=False) item["approved"] = result["approved"] item["score"] = result["overall"] item["check_reason"] = result["reason"] return item logger.info(f"[Checker] Running {len(media_results)} QA checks in parallel...") with ThreadPoolExecutor(max_workers=4) as executor: list(executor.map(_check_task, media_results)) approved_count = sum(1 for m in media_results if m.get("approved")) logger.info(f"[Checker] ✅ {approved_count}/{len(media_results)} assets passed QA.") return media_results # ── CLI Test ────────────────────────────────────────────── if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--image", required=True, help="Path to image or video file") parser.add_argument("--topic", default="AI technology future", help="Topic for relevance check") args = parser.parse_args() logging.basicConfig(level=logging.INFO) print(f"\n🔍 Checking asset: {args.image}") print(f" Topic: '{args.topic}'\n") result = check_asset(args.image, args.topic) print(json.dumps(result, indent=2))