# /data/rczhang/PencilFolder/multi-shot/data/ # ├── Data1/ # movie1 # │ ├── meta # 元数据文件(包含视频描述和ID信息) # │ ├── 1/ # │ │ └── 1.mp4 # 视频文件 # │ ├── 2/ # │ │ ├── 1.mp4 # 视频文件 # │ │ └── 2.mp4 # 视频文件 # │ └── ID/ # │ └── ID1/ # │ ├── 1.png # 人物ID图片 # │ ├── 2.png # 人物ID图片 # │ └── 3.png # 人物ID图片 # │ # ├── Data2/ # movie2 # │ ├── meta # 元数据文件 # │ ├── 1/ # │ │ ├── 1.mp4 # 视频文件 # │ │ └── 2.mp4 # 视频文件 # │ ├── 2/ # │ │ └── 1.mp4 # 视频文件 # │ └── ID/ # │ ├── ID1/ # │ │ ├── 1.png # 人物ID图片 # │ │ ├── 2.png # 人物ID图片 # │ │ └── 3.png # 人物ID图片 # │ └── ID2/ # │ ├── 1.png # 人物ID图片 # │ ├── 2.png # 人物ID图片 # │ └── 3.png # 人物ID图片 # │ # └── Data3/ # movie3 # ├── meta # 元数据文件 # ├── 1/ # │ └── 1.mp4 # 视频文件 # └── ID/ # └── ID1/ # ├── 1.png # 人物ID图片 # ├── 2.png # 人物ID图片 # └── 3.png # 人物ID图片 import argparse import json from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple import imageio.v2 as imageio import numpy as np from PIL import Image IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} DEFAULT_FPS = 16 TARGET_HEIGHT = 480 TARGET_WIDTH = 832 def read_json_lines(path: Path) -> List[Dict[str, Any]]: entries: List[Dict[str, Any]] = [] with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue entries.append(json.loads(line)) return entries def normalize_list(value: Any) -> List[str]: if value is None: return [] if isinstance(value, list): return [str(item) for item in value] return [str(value)] def sort_key_numeric(path: Path) -> Tuple[int, int, str]: stem = path.stem if stem.isdigit(): return (0, int(stem), stem) return (1, 0, stem) def list_shot_videos(segment_dir: Path) -> List[Path]: shots = [p for p in segment_dir.iterdir() if p.is_file() and p.suffix.lower() == ".mp4"] return sorted(shots, key=sort_key_numeric) def list_id_images(id_dir: Path) -> List[Path]: images = [p for p in id_dir.iterdir() if p.is_file() and p.suffix.lower() in IMAGE_EXTS] return sorted(images, key=sort_key_numeric) def resize_center_crop(image: Image.Image, target_h: int, target_w: int) -> Image.Image: w, h = image.size scale = max(target_w / w, target_h / h) new_w = int(round(w * scale)) new_h = int(round(h * scale)) image = image.resize((new_w, new_h), Image.Resampling.LANCZOS) left = max(0, (new_w - target_w) // 2) top = max(0, (new_h - target_h) // 2) return image.crop((left, top, left + target_w, top + target_h)) def iter_resampled_frames(video_path: Path, target_fps: int) -> Iterable[np.ndarray]: reader = imageio.get_reader(str(video_path)) try: meta = reader.get_meta_data() fps = meta.get("fps", target_fps) or target_fps total_frames: Optional[int] = None try: total_frames = reader.count_frames() except Exception: duration = meta.get("duration") if duration: total_frames = int(round(duration * fps)) if total_frames is None or total_frames <= 0: frames = [frame for frame in reader] if not frames: return total_frames = len(frames) target_count = max(1, int(round(total_frames * target_fps / fps))) indices = np.linspace(0, total_frames - 1, num=target_count, dtype=int) for idx in indices: yield frames[int(idx)] return duration = total_frames / fps if fps > 0 else total_frames / target_fps target_count = max(1, int(round(duration * target_fps))) indices = np.linspace(0, total_frames - 1, num=target_count, dtype=int) for idx in indices: yield reader.get_data(int(idx)) finally: reader.close() def pick_id_names(id_list: List[str], id_root: Path) -> List[str]: if id_list: seen = set() ordered = [] for name in id_list: if name not in seen: ordered.append(name) seen.add(name) return ordered if not id_root.exists(): return [] names = [] for child in sorted(id_root.iterdir(), key=sort_key_numeric): if child.is_dir(): names.append(child.name) return names def build_text(captions: List[str]) -> str: cleaned = [caption.strip() for caption in captions if caption and caption.strip()] return " ".join(cleaned) if cleaned else "unknown" def process_segment( data_dir: Path, segment_index: int, meta_entry: Dict[str, Any], output_dir: Path, target_fps: int, height: int, width: int, ) -> Optional[Dict[str, Any]]: segment_dir = data_dir / str(segment_index) if not segment_dir.exists(): return None shot_videos = list_shot_videos(segment_dir) if not shot_videos: return None captions = normalize_list(meta_entry.get("caption")) ids = normalize_list(meta_entry.get("ID")) shot_captions: List[str] = [] for idx in range(len(shot_videos)): if idx < len(captions): shot_captions.append(captions[idx]) elif captions: shot_captions.append(captions[-1]) else: shot_captions.append("") text = build_text(shot_captions) id_root = data_dir / "ID" id_names = pick_id_names(ids, id_root) if not id_names: return None id_dir = id_root if not id_dir.exists(): return None output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / f"{segment_index}.mp4" writer = imageio.get_writer(str(output_path), fps=target_fps) try: for shot_path in shot_videos: for frame in iter_resampled_frames(shot_path, target_fps): img = Image.fromarray(frame).convert("RGB") img = resize_center_crop(img, height, width) writer.append_data(np.asarray(img)) finally: writer.close() return { "disk_path": str(output_path), "text": text, "shot_captions": shot_captions, "id_dir": str(id_dir), "id_names": id_names, } def build_dataset( data_root: Path, output_root: Path, output_json: Path, target_fps: int, height: int, width: int, limit: Optional[int], ) -> Path: dataset: Dict[str, Dict[str, Any]] = {} total_segments = 0 for data_dir in sorted([p for p in data_root.iterdir() if p.is_dir()], key=sort_key_numeric): meta_path = data_dir / "meta" if not meta_path.exists(): continue meta_entries = read_json_lines(meta_path) if not meta_entries: continue output_dir = output_root / data_dir.name for idx, meta_entry in enumerate(meta_entries, start=1): entry = process_segment( data_dir=data_dir, segment_index=idx, meta_entry=meta_entry, output_dir=output_dir, target_fps=target_fps, height=height, width=width, ) if entry is None: continue key = f"{data_dir.name}/{idx}" dataset[key] = entry total_segments += 1 if limit is not None and total_segments >= limit: break if limit is not None and total_segments >= limit: break output_json.parent.mkdir(parents=True, exist_ok=True) with output_json.open("w", encoding="utf-8") as f: json.dump(dataset, f, ensure_ascii=False, indent=2) return output_json def main() -> int: parser = argparse.ArgumentParser(description="Preprocess multi-shot data into videodataset JSON.") parser.add_argument( "--data_root", type=str, default="/data/rczhang/PencilFolder/multi-shot/data", help="Root folder that contains Data* directories.", ) parser.add_argument( "--output_root", type=str, default="/data/rczhang/PencilFolder/multi-shot/processed", help="Where to save processed videos.", ) parser.add_argument( "--output_json", type=str, default="/data/rczhang/PencilFolder/multi-shot/processed/dataset.json", help="Output dataset JSON path.", ) parser.add_argument("--target_fps", type=int, default=DEFAULT_FPS) parser.add_argument("--height", type=int, default=TARGET_HEIGHT) parser.add_argument("--width", type=int, default=TARGET_WIDTH) parser.add_argument("--limit", type=int, default=None, help="Limit processed segments for quick tests.") args = parser.parse_args() output_json = build_dataset( data_root=Path(args.data_root), output_root=Path(args.output_root), output_json=Path(args.output_json), target_fps=args.target_fps, height=args.height, width=args.width, limit=args.limit, ) print(f"Saved dataset JSON to: {output_json}") return 0 if __name__ == "__main__": raise SystemExit(main())