multishot / process_data.py
PencilHu's picture
Upload folder using huggingface_hub
85752bc verified
# /data/rczhang/PencilFolder/multi-shot/data/
# ├── Data1/ # movie1
# │ ├── meta # 元数据文件(包含视频描述和ID信息)
# │ ├── 1/
# │ │ └── 1.mp4 # 视频文件
# │ ├── 2/
# │ │ ├── 1.mp4 # 视频文件
# │ │ └── 2.mp4 # 视频文件
# │ └── ID/
# │ └── ID1/
# │ ├── 1.png # 人物ID图片
# │ ├── 2.png # 人物ID图片
# │ └── 3.png # 人物ID图片
# │
# ├── Data2/ # movie2
# │ ├── meta # 元数据文件
# │ ├── 1/
# │ │ ├── 1.mp4 # 视频文件
# │ │ └── 2.mp4 # 视频文件
# │ ├── 2/
# │ │ └── 1.mp4 # 视频文件
# │ └── ID/
# │ ├── ID1/
# │ │ ├── 1.png # 人物ID图片
# │ │ ├── 2.png # 人物ID图片
# │ │ └── 3.png # 人物ID图片
# │ └── ID2/
# │ ├── 1.png # 人物ID图片
# │ ├── 2.png # 人物ID图片
# │ └── 3.png # 人物ID图片
# │
# └── Data3/ # movie3
# ├── meta # 元数据文件
# ├── 1/
# │ └── 1.mp4 # 视频文件
# └── ID/
# └── ID1/
# ├── 1.png # 人物ID图片
# ├── 2.png # 人物ID图片
# └── 3.png # 人物ID图片
import argparse
import json
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
import imageio.v2 as imageio
import numpy as np
from PIL import Image
IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".bmp"}
DEFAULT_FPS = 16
TARGET_HEIGHT = 480
TARGET_WIDTH = 832
def read_json_lines(path: Path) -> List[Dict[str, Any]]:
entries: List[Dict[str, Any]] = []
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
entries.append(json.loads(line))
return entries
def normalize_list(value: Any) -> List[str]:
if value is None:
return []
if isinstance(value, list):
return [str(item) for item in value]
return [str(value)]
def sort_key_numeric(path: Path) -> Tuple[int, int, str]:
stem = path.stem
if stem.isdigit():
return (0, int(stem), stem)
return (1, 0, stem)
def list_shot_videos(segment_dir: Path) -> List[Path]:
shots = [p for p in segment_dir.iterdir() if p.is_file() and p.suffix.lower() == ".mp4"]
return sorted(shots, key=sort_key_numeric)
def list_id_images(id_dir: Path) -> List[Path]:
images = [p for p in id_dir.iterdir() if p.is_file() and p.suffix.lower() in IMAGE_EXTS]
return sorted(images, key=sort_key_numeric)
def resize_center_crop(image: Image.Image, target_h: int, target_w: int) -> Image.Image:
w, h = image.size
scale = max(target_w / w, target_h / h)
new_w = int(round(w * scale))
new_h = int(round(h * scale))
image = image.resize((new_w, new_h), Image.Resampling.LANCZOS)
left = max(0, (new_w - target_w) // 2)
top = max(0, (new_h - target_h) // 2)
return image.crop((left, top, left + target_w, top + target_h))
def iter_resampled_frames(video_path: Path, target_fps: int) -> Iterable[np.ndarray]:
reader = imageio.get_reader(str(video_path))
try:
meta = reader.get_meta_data()
fps = meta.get("fps", target_fps) or target_fps
total_frames: Optional[int] = None
try:
total_frames = reader.count_frames()
except Exception:
duration = meta.get("duration")
if duration:
total_frames = int(round(duration * fps))
if total_frames is None or total_frames <= 0:
frames = [frame for frame in reader]
if not frames:
return
total_frames = len(frames)
target_count = max(1, int(round(total_frames * target_fps / fps)))
indices = np.linspace(0, total_frames - 1, num=target_count, dtype=int)
for idx in indices:
yield frames[int(idx)]
return
duration = total_frames / fps if fps > 0 else total_frames / target_fps
target_count = max(1, int(round(duration * target_fps)))
indices = np.linspace(0, total_frames - 1, num=target_count, dtype=int)
for idx in indices:
yield reader.get_data(int(idx))
finally:
reader.close()
def pick_id_names(id_list: List[str], id_root: Path) -> List[str]:
if id_list:
seen = set()
ordered = []
for name in id_list:
if name not in seen:
ordered.append(name)
seen.add(name)
return ordered
if not id_root.exists():
return []
names = []
for child in sorted(id_root.iterdir(), key=sort_key_numeric):
if child.is_dir():
names.append(child.name)
return names
def build_text(captions: List[str]) -> str:
cleaned = [caption.strip() for caption in captions if caption and caption.strip()]
return " ".join(cleaned) if cleaned else "unknown"
def process_segment(
data_dir: Path,
segment_index: int,
meta_entry: Dict[str, Any],
output_dir: Path,
target_fps: int,
height: int,
width: int,
) -> Optional[Dict[str, Any]]:
segment_dir = data_dir / str(segment_index)
if not segment_dir.exists():
return None
shot_videos = list_shot_videos(segment_dir)
if not shot_videos:
return None
captions = normalize_list(meta_entry.get("caption"))
ids = normalize_list(meta_entry.get("ID"))
shot_captions: List[str] = []
for idx in range(len(shot_videos)):
if idx < len(captions):
shot_captions.append(captions[idx])
elif captions:
shot_captions.append(captions[-1])
else:
shot_captions.append("")
text = build_text(shot_captions)
id_root = data_dir / "ID"
id_names = pick_id_names(ids, id_root)
if not id_names:
return None
id_dir = id_root
if not id_dir.exists():
return None
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{segment_index}.mp4"
writer = imageio.get_writer(str(output_path), fps=target_fps)
try:
for shot_path in shot_videos:
for frame in iter_resampled_frames(shot_path, target_fps):
img = Image.fromarray(frame).convert("RGB")
img = resize_center_crop(img, height, width)
writer.append_data(np.asarray(img))
finally:
writer.close()
return {
"disk_path": str(output_path),
"text": text,
"shot_captions": shot_captions,
"id_dir": str(id_dir),
"id_names": id_names,
}
def build_dataset(
data_root: Path,
output_root: Path,
output_json: Path,
target_fps: int,
height: int,
width: int,
limit: Optional[int],
) -> Path:
dataset: Dict[str, Dict[str, Any]] = {}
total_segments = 0
for data_dir in sorted([p for p in data_root.iterdir() if p.is_dir()], key=sort_key_numeric):
meta_path = data_dir / "meta"
if not meta_path.exists():
continue
meta_entries = read_json_lines(meta_path)
if not meta_entries:
continue
output_dir = output_root / data_dir.name
for idx, meta_entry in enumerate(meta_entries, start=1):
entry = process_segment(
data_dir=data_dir,
segment_index=idx,
meta_entry=meta_entry,
output_dir=output_dir,
target_fps=target_fps,
height=height,
width=width,
)
if entry is None:
continue
key = f"{data_dir.name}/{idx}"
dataset[key] = entry
total_segments += 1
if limit is not None and total_segments >= limit:
break
if limit is not None and total_segments >= limit:
break
output_json.parent.mkdir(parents=True, exist_ok=True)
with output_json.open("w", encoding="utf-8") as f:
json.dump(dataset, f, ensure_ascii=False, indent=2)
return output_json
def main() -> int:
parser = argparse.ArgumentParser(description="Preprocess multi-shot data into videodataset JSON.")
parser.add_argument(
"--data_root",
type=str,
default="/data/rczhang/PencilFolder/multi-shot/data",
help="Root folder that contains Data* directories.",
)
parser.add_argument(
"--output_root",
type=str,
default="/data/rczhang/PencilFolder/multi-shot/processed",
help="Where to save processed videos.",
)
parser.add_argument(
"--output_json",
type=str,
default="/data/rczhang/PencilFolder/multi-shot/processed/dataset.json",
help="Output dataset JSON path.",
)
parser.add_argument("--target_fps", type=int, default=DEFAULT_FPS)
parser.add_argument("--height", type=int, default=TARGET_HEIGHT)
parser.add_argument("--width", type=int, default=TARGET_WIDTH)
parser.add_argument("--limit", type=int, default=None, help="Limit processed segments for quick tests.")
args = parser.parse_args()
output_json = build_dataset(
data_root=Path(args.data_root),
output_root=Path(args.output_root),
output_json=Path(args.output_json),
target_fps=args.target_fps,
height=args.height,
width=args.width,
limit=args.limit,
)
print(f"Saved dataset JSON to: {output_json}")
return 0
if __name__ == "__main__":
raise SystemExit(main())