| import os |
| import shutil |
| import tempfile |
| import logging |
| from pathlib import Path |
| from huggingface_hub import HfApi, snapshot_download, upload_folder, create_repo |
| from dotenv import load_dotenv |
|
|
| from app.config.hf_config import HF_TOKEN, HF_ORGANIZATION, HF_AGGREGATED |
|
|
| |
| SOURCE_USERNAME = HF_AGGREGATED |
| DESTINATION_USERNAME = "tfrere" |
|
|
| |
| BACKEND_DIR = Path(__file__).parent.parent |
| ROOT_DIR = BACKEND_DIR.parent |
|
|
| |
| load_dotenv(ROOT_DIR / ".env") |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| DATASET_NAMES = [ |
| "votes", |
| "results", |
| "requests", |
| HF_AGGREGATED, |
| "maintainers-highlight", |
| ] |
|
|
| |
| DATASETS = [ |
| (name, f"{SOURCE_USERNAME}/{name}", f"{DESTINATION_USERNAME}/{name}") |
| for name in DATASET_NAMES |
| ] |
|
|
| |
| api = HfApi() |
|
|
| def ensure_repo_exists(repo_id, token): |
| """Ensure the repository exists, create it if it doesn't""" |
| try: |
| api.repo_info(repo_id=repo_id, repo_type="dataset") |
| logger.info(f"β Repository {repo_id} already exists") |
| except Exception: |
| logger.info(f"Creating repository {repo_id}...") |
| create_repo( |
| repo_id=repo_id, |
| repo_type="dataset", |
| token=token, |
| private=True |
| ) |
| logger.info(f"β Repository {repo_id} created") |
|
|
| def process_dataset(dataset_info, token): |
| """Process a single dataset""" |
| name, source_dataset, destination_dataset = dataset_info |
| try: |
| logger.info(f"\nπ₯ Processing dataset: {name}") |
| |
| |
| ensure_repo_exists(destination_dataset, token) |
| |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| try: |
| |
| logger.info(f"Listing files in {source_dataset}...") |
| files = api.list_repo_files(source_dataset, repo_type="dataset") |
| logger.info(f"Detected structure: {len(files)} files") |
| |
| |
| logger.info(f"Downloading from {source_dataset}...") |
| local_dir = snapshot_download( |
| repo_id=source_dataset, |
| repo_type="dataset", |
| local_dir=temp_dir, |
| token=token |
| ) |
| logger.info(f"β Download complete") |
| |
| |
| logger.info(f"π€ Uploading to {destination_dataset}...") |
| api.upload_folder( |
| folder_path=local_dir, |
| repo_id=destination_dataset, |
| repo_type="dataset", |
| token=token |
| ) |
| logger.info(f"β
{name} copied successfully!") |
| return True |
| |
| except Exception as e: |
| logger.error(f"β Error processing {name}: {str(e)}") |
| return False |
|
|
| except Exception as e: |
| logger.error(f"β Error for {name}: {str(e)}") |
| return False |
|
|
| def copy_datasets(): |
| try: |
| logger.info("π Checking authentication...") |
| |
| |
| |
| |
| |
| |
| results = [] |
| for dataset_info in DATASETS: |
| success = process_dataset(dataset_info, token) |
| results.append((dataset_info[0], success)) |
| |
| |
| logger.info("\nπ Final summary:") |
| for dataset, success in results: |
| status = "β
Success" if success else "β Failure" |
| logger.info(f"{dataset}: {status}") |
| |
| except Exception as e: |
| logger.error(f"β Global error: {str(e)}") |
|
|
| if __name__ == "__main__": |
| copy_datasets() |
|
|