| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import argparse |
| import json |
| from pathlib import Path |
| from typing import Any, Dict, Iterable, List, Optional, Tuple |
|
|
| import imageio.v2 as imageio |
| import numpy as np |
| from PIL import Image |
|
|
|
|
| IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} |
| DEFAULT_FPS = 16 |
| TARGET_HEIGHT = 480 |
| TARGET_WIDTH = 832 |
|
|
|
|
| def read_json_lines(path: Path) -> List[Dict[str, Any]]: |
| entries: List[Dict[str, Any]] = [] |
| with path.open("r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| entries.append(json.loads(line)) |
| return entries |
|
|
|
|
| def normalize_list(value: Any) -> List[str]: |
| if value is None: |
| return [] |
| if isinstance(value, list): |
| return [str(item) for item in value] |
| return [str(value)] |
|
|
|
|
| def sort_key_numeric(path: Path) -> Tuple[int, int, str]: |
| stem = path.stem |
| if stem.isdigit(): |
| return (0, int(stem), stem) |
| return (1, 0, stem) |
|
|
|
|
| def list_shot_videos(segment_dir: Path) -> List[Path]: |
| shots = [p for p in segment_dir.iterdir() if p.is_file() and p.suffix.lower() == ".mp4"] |
| return sorted(shots, key=sort_key_numeric) |
|
|
|
|
| def list_id_images(id_dir: Path) -> List[Path]: |
| images = [p for p in id_dir.iterdir() if p.is_file() and p.suffix.lower() in IMAGE_EXTS] |
| return sorted(images, key=sort_key_numeric) |
|
|
|
|
| def resize_center_crop(image: Image.Image, target_h: int, target_w: int) -> Image.Image: |
| w, h = image.size |
| scale = max(target_w / w, target_h / h) |
| new_w = int(round(w * scale)) |
| new_h = int(round(h * scale)) |
| image = image.resize((new_w, new_h), Image.Resampling.LANCZOS) |
| left = max(0, (new_w - target_w) // 2) |
| top = max(0, (new_h - target_h) // 2) |
| return image.crop((left, top, left + target_w, top + target_h)) |
|
|
|
|
| def iter_resampled_frames(video_path: Path, target_fps: int) -> Iterable[np.ndarray]: |
| reader = imageio.get_reader(str(video_path)) |
| try: |
| meta = reader.get_meta_data() |
| fps = meta.get("fps", target_fps) or target_fps |
| total_frames: Optional[int] = None |
| try: |
| total_frames = reader.count_frames() |
| except Exception: |
| duration = meta.get("duration") |
| if duration: |
| total_frames = int(round(duration * fps)) |
|
|
| if total_frames is None or total_frames <= 0: |
| frames = [frame for frame in reader] |
| if not frames: |
| return |
| total_frames = len(frames) |
| target_count = max(1, int(round(total_frames * target_fps / fps))) |
| indices = np.linspace(0, total_frames - 1, num=target_count, dtype=int) |
| for idx in indices: |
| yield frames[int(idx)] |
| return |
|
|
| duration = total_frames / fps if fps > 0 else total_frames / target_fps |
| target_count = max(1, int(round(duration * target_fps))) |
| indices = np.linspace(0, total_frames - 1, num=target_count, dtype=int) |
| for idx in indices: |
| yield reader.get_data(int(idx)) |
| finally: |
| reader.close() |
|
|
|
|
| def pick_id_names(id_list: List[str], id_root: Path) -> List[str]: |
| if id_list: |
| seen = set() |
| ordered = [] |
| for name in id_list: |
| if name not in seen: |
| ordered.append(name) |
| seen.add(name) |
| return ordered |
| if not id_root.exists(): |
| return [] |
| names = [] |
| for child in sorted(id_root.iterdir(), key=sort_key_numeric): |
| if child.is_dir(): |
| names.append(child.name) |
| return names |
|
|
|
|
| def build_text(captions: List[str]) -> str: |
| cleaned = [caption.strip() for caption in captions if caption and caption.strip()] |
| return " ".join(cleaned) if cleaned else "unknown" |
|
|
|
|
| def process_segment( |
| data_dir: Path, |
| segment_index: int, |
| meta_entry: Dict[str, Any], |
| output_dir: Path, |
| target_fps: int, |
| height: int, |
| width: int, |
| ) -> Optional[Dict[str, Any]]: |
| segment_dir = data_dir / str(segment_index) |
| if not segment_dir.exists(): |
| return None |
|
|
| shot_videos = list_shot_videos(segment_dir) |
| if not shot_videos: |
| return None |
|
|
| captions = normalize_list(meta_entry.get("caption")) |
| ids = normalize_list(meta_entry.get("ID")) |
| shot_captions: List[str] = [] |
| for idx in range(len(shot_videos)): |
| if idx < len(captions): |
| shot_captions.append(captions[idx]) |
| elif captions: |
| shot_captions.append(captions[-1]) |
| else: |
| shot_captions.append("") |
|
|
| text = build_text(shot_captions) |
| id_root = data_dir / "ID" |
| id_names = pick_id_names(ids, id_root) |
| if not id_names: |
| return None |
|
|
| id_dir = id_root |
| if not id_dir.exists(): |
| return None |
|
|
| output_dir.mkdir(parents=True, exist_ok=True) |
| output_path = output_dir / f"{segment_index}.mp4" |
| writer = imageio.get_writer(str(output_path), fps=target_fps) |
| try: |
| for shot_path in shot_videos: |
| for frame in iter_resampled_frames(shot_path, target_fps): |
| img = Image.fromarray(frame).convert("RGB") |
| img = resize_center_crop(img, height, width) |
| writer.append_data(np.asarray(img)) |
| finally: |
| writer.close() |
|
|
| return { |
| "disk_path": str(output_path), |
| "text": text, |
| "shot_captions": shot_captions, |
| "id_dir": str(id_dir), |
| "id_names": id_names, |
| } |
|
|
|
|
| def build_dataset( |
| data_root: Path, |
| output_root: Path, |
| output_json: Path, |
| target_fps: int, |
| height: int, |
| width: int, |
| limit: Optional[int], |
| ) -> Path: |
| dataset: Dict[str, Dict[str, Any]] = {} |
| total_segments = 0 |
|
|
| for data_dir in sorted([p for p in data_root.iterdir() if p.is_dir()], key=sort_key_numeric): |
| meta_path = data_dir / "meta" |
| if not meta_path.exists(): |
| continue |
|
|
| meta_entries = read_json_lines(meta_path) |
| if not meta_entries: |
| continue |
|
|
| output_dir = output_root / data_dir.name |
|
|
| for idx, meta_entry in enumerate(meta_entries, start=1): |
| entry = process_segment( |
| data_dir=data_dir, |
| segment_index=idx, |
| meta_entry=meta_entry, |
| output_dir=output_dir, |
| target_fps=target_fps, |
| height=height, |
| width=width, |
| ) |
| if entry is None: |
| continue |
|
|
| key = f"{data_dir.name}/{idx}" |
| dataset[key] = entry |
| total_segments += 1 |
| if limit is not None and total_segments >= limit: |
| break |
|
|
| if limit is not None and total_segments >= limit: |
| break |
|
|
| output_json.parent.mkdir(parents=True, exist_ok=True) |
| with output_json.open("w", encoding="utf-8") as f: |
| json.dump(dataset, f, ensure_ascii=False, indent=2) |
|
|
| return output_json |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description="Preprocess multi-shot data into videodataset JSON.") |
| parser.add_argument( |
| "--data_root", |
| type=str, |
| default="/data/rczhang/PencilFolder/multi-shot/data", |
| help="Root folder that contains Data* directories.", |
| ) |
| parser.add_argument( |
| "--output_root", |
| type=str, |
| default="/data/rczhang/PencilFolder/multi-shot/processed", |
| help="Where to save processed videos.", |
| ) |
| parser.add_argument( |
| "--output_json", |
| type=str, |
| default="/data/rczhang/PencilFolder/multi-shot/processed/dataset.json", |
| help="Output dataset JSON path.", |
| ) |
| parser.add_argument("--target_fps", type=int, default=DEFAULT_FPS) |
| parser.add_argument("--height", type=int, default=TARGET_HEIGHT) |
| parser.add_argument("--width", type=int, default=TARGET_WIDTH) |
| parser.add_argument("--limit", type=int, default=None, help="Limit processed segments for quick tests.") |
| args = parser.parse_args() |
|
|
| output_json = build_dataset( |
| data_root=Path(args.data_root), |
| output_root=Path(args.output_root), |
| output_json=Path(args.output_json), |
| target_fps=args.target_fps, |
| height=args.height, |
| width=args.width, |
| limit=args.limit, |
| ) |
| print(f"Saved dataset JSON to: {output_json}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|