| |
| """Create and optionally publish a resized HF dataset for the HyperView Space.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| from datetime import datetime, timezone |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| from PIL import Image |
| from datasets import Dataset, Image as HFImage |
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[2] |
| DEFAULT_DATASET_ROOT = PROJECT_ROOT / "kaggle_jaguar_dataset_v2" |
| DEFAULT_CORESET_CSV = PROJECT_ROOT / "data/validation_coreset.csv" |
| DEFAULT_OUTPUT_DIR = PROJECT_ROOT / "HyperViewDemoHuggingFaceSpace/dataset_build" |
| DEFAULT_REPO_ID = os.environ.get("HF_DATASET_REPO", "hyper3labs/jaguar-hyperview-demo") |
|
|
|
|
| def utc_now() -> str: |
| return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description="Build resized train+validation demo dataset with split tags for HyperView." |
| ) |
| parser.add_argument("--dataset_root", type=Path, default=DEFAULT_DATASET_ROOT) |
| parser.add_argument("--coreset_csv", type=Path, default=DEFAULT_CORESET_CSV) |
| parser.add_argument("--output_dir", type=Path, default=DEFAULT_OUTPUT_DIR) |
| parser.add_argument("--repo_id", type=str, default=DEFAULT_REPO_ID) |
| parser.add_argument("--config_name", type=str, default="default") |
| parser.add_argument("--image_size", type=int, default=384) |
| parser.add_argument("--jpeg_quality", type=int, default=90) |
| parser.add_argument( |
| "--image_variant", |
| type=str, |
| default="foreground_only", |
| choices=["foreground_only", "full_rgb"], |
| ) |
| parser.add_argument("--max_samples", type=int, default=None) |
| parser.add_argument("--private", action="store_true") |
| parser.add_argument("--hf_token_env", type=str, default="HF_TOKEN") |
| parser.add_argument("--no_push", action="store_true") |
| return parser.parse_args() |
|
|
|
|
| def load_rows(dataset_root: Path, coreset_csv: Path, max_samples: int | None) -> pd.DataFrame: |
| train_csv = dataset_root / "train.csv" |
| if not train_csv.exists(): |
| raise FileNotFoundError(f"Missing train.csv at {train_csv}") |
|
|
| train_df = pd.read_csv(train_csv) |
| coreset_df = pd.read_csv(coreset_csv) |
| coreset_filenames = set(coreset_df["filename"].astype(str).tolist()) |
|
|
| train_df = train_df.copy() |
| train_df["filename"] = train_df["filename"].astype(str) |
| train_df["label"] = train_df["ground_truth"].astype(str) |
| train_df["split_tag"] = np.where(train_df["filename"].isin(coreset_filenames), "validation", "train") |
| train_df["sample_id"] = train_df["filename"] |
|
|
| if max_samples is not None: |
| train_df = train_df.iloc[: int(max_samples)].copy() |
|
|
| return train_df[["filename", "label", "split_tag", "sample_id"]] |
|
|
|
|
| def load_variant_image(image_path: Path, image_variant: str) -> Image.Image: |
| if image_variant == "foreground_only": |
| rgba = Image.open(image_path).convert("RGBA") |
| rgba_np = np.array(rgba, dtype=np.uint8) |
| rgb = rgba_np[:, :, :3] |
| alpha = rgba_np[:, :, 3] |
| mask = (alpha > 0).astype(np.uint8) |
| cutout_rgb = (rgb * mask[:, :, np.newaxis]).astype(np.uint8) |
| return Image.fromarray(cutout_rgb, mode="RGB") |
| return Image.open(image_path).convert("RGB") |
|
|
|
|
| def build_resized_images( |
| rows_df: pd.DataFrame, |
| dataset_root: Path, |
| output_images_dir: Path, |
| image_size: int, |
| jpeg_quality: int, |
| image_variant: str, |
| ) -> pd.DataFrame: |
| source_images_dir = dataset_root / "train" |
| if not source_images_dir.exists(): |
| raise FileNotFoundError(f"Missing image directory: {source_images_dir}") |
|
|
| output_images_dir.mkdir(parents=True, exist_ok=True) |
|
|
| records: list[dict[str, str]] = [] |
| for _, row in rows_df.iterrows(): |
| filename = str(row["filename"]) |
| src = source_images_dir / filename |
| if not src.exists(): |
| raise FileNotFoundError(f"Missing source image: {src}") |
|
|
| image = load_variant_image(src, image_variant=image_variant) |
| image = image.resize((int(image_size), int(image_size)), Image.Resampling.BICUBIC) |
|
|
| dst_name = f"{Path(filename).stem}.jpg" |
| dst = output_images_dir / dst_name |
| image.save(dst, format="JPEG", quality=int(jpeg_quality), optimize=True) |
|
|
| records.append( |
| { |
| "image": str(dst), |
| "label": str(row["label"]), |
| "filename": filename, |
| "split_tag": str(row["split_tag"]), |
| "sample_id": str(row["sample_id"]), |
| } |
| ) |
|
|
| return pd.DataFrame(records) |
|
|
|
|
| def build_hf_dataset(records_df: pd.DataFrame) -> Dataset: |
| payload = { |
| "image": records_df["image"].tolist(), |
| "label": records_df["label"].tolist(), |
| "filename": records_df["filename"].tolist(), |
| "split_tag": records_df["split_tag"].tolist(), |
| "sample_id": records_df["sample_id"].tolist(), |
| } |
| dataset = Dataset.from_dict(payload) |
| dataset = dataset.cast_column("image", HFImage()) |
| return dataset |
|
|
|
|
| def maybe_push_to_hub( |
| dataset: Dataset, |
| repo_id: str, |
| config_name: str, |
| private: bool, |
| hf_token_env: str, |
| no_push: bool, |
| ) -> str: |
| if no_push: |
| return "skipped (--no_push)" |
|
|
| token = os.environ.get(hf_token_env) |
| if not token: |
| raise RuntimeError( |
| f"Missing Hugging Face token in environment variable {hf_token_env}." |
| ) |
|
|
| dataset.push_to_hub( |
| repo_id=repo_id, |
| config_name=config_name, |
| token=token, |
| private=bool(private), |
| ) |
| return f"pushed:{repo_id}:{config_name}" |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
|
|
| output_dir = args.output_dir.resolve() |
| images_out = output_dir / "images" |
| dataset_out = output_dir / "hf_dataset" |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| rows_df = load_rows( |
| dataset_root=args.dataset_root.resolve(), |
| coreset_csv=args.coreset_csv.resolve(), |
| max_samples=args.max_samples, |
| ) |
| if rows_df.empty: |
| raise RuntimeError("No dataset rows found for publish pipeline.") |
|
|
| records_df = build_resized_images( |
| rows_df=rows_df, |
| dataset_root=args.dataset_root.resolve(), |
| output_images_dir=images_out, |
| image_size=int(args.image_size), |
| jpeg_quality=int(args.jpeg_quality), |
| image_variant=args.image_variant, |
| ) |
|
|
| dataset = build_hf_dataset(records_df) |
| dataset.save_to_disk(str(dataset_out)) |
|
|
| publish_status = maybe_push_to_hub( |
| dataset=dataset, |
| repo_id=args.repo_id, |
| config_name=args.config_name, |
| private=args.private, |
| hf_token_env=args.hf_token_env, |
| no_push=args.no_push, |
| ) |
|
|
| metadata = { |
| "generated_at_utc": utc_now(), |
| "dataset_root": str(args.dataset_root.resolve()), |
| "coreset_csv": str(args.coreset_csv.resolve()), |
| "output_dir": str(output_dir), |
| "repo_id": args.repo_id, |
| "config_name": args.config_name, |
| "image_size": int(args.image_size), |
| "jpeg_quality": int(args.jpeg_quality), |
| "image_variant": args.image_variant, |
| "num_rows": int(len(records_df)), |
| "split_counts": records_df["split_tag"].value_counts().to_dict(), |
| "push_status": publish_status, |
| } |
|
|
| metadata_path = output_dir / "publish_metadata.json" |
| metadata_path.write_text(json.dumps(metadata, indent=2), encoding="utf-8") |
|
|
| print("=== HyperView demo dataset pipeline complete ===") |
| print(f"Rows: {len(records_df)}") |
| print(f"HF dataset saved to: {dataset_out}") |
| print(f"Push status: {publish_status}") |
| print(f"Metadata: {metadata_path}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|