""" Automatic backup of Argilla annotations to HF Dataset. Runs as a honcho-managed process, syncs every 2 minutes by default. """ import os import sys import json import time from pathlib import Path from datetime import datetime # Configuration - use writable directory under /home/argilla EXPORT_DIR = Path("/home/argilla/exports") BACKUP_REPO = os.environ.get("BACKUP_DATASET_REPO") if not BACKUP_REPO: print("ERROR: BACKUP_DATASET_REPO not set, backup script exiting", flush=True) sys.exit(1) SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL_MINUTES", "2")) ARGILLA_API_URL = "http://localhost:6900" ARGILLA_API_KEY = os.environ.get("ARGILLA_API_KEY") if not ARGILLA_API_KEY: print("ERROR: ARGILLA_API_KEY not set, backup script exiting", flush=True) sys.exit(1) # Check for HF_TOKEN before importing heavy dependencies hf_token = os.environ.get("HF_TOKEN") if not hf_token: print("ERROR: HF_TOKEN not set, backup script exiting", flush=True) sys.exit(1) from huggingface_hub import CommitScheduler, login # Setup EXPORT_DIR.mkdir(parents=True, exist_ok=True) login(token=hf_token) # Setup CommitScheduler scheduler = CommitScheduler( repo_id=BACKUP_REPO, repo_type="dataset", folder_path=EXPORT_DIR, every=SYNC_INTERVAL, token=hf_token, ) # Track annotation counts to avoid redundant exports last_annotation_counts = {} def wait_for_argilla(): """Wait for Argilla server to be ready with extended retries.""" import urllib.request max_retries = 60 # Extended retries since we're starting with Argilla for i in range(max_retries): try: urllib.request.urlopen(f"{ARGILLA_API_URL}/api/v1/status", timeout=5) print("Argilla server is ready", flush=True) return True except Exception as e: print(f"Waiting for Argilla... ({i+1}/{max_retries}) - {e}", flush=True) time.sleep(5) print("ERROR: Argilla server not responding after maximum retries", flush=True) return False def get_user_map(headers): """Build a user_id -> username map from all users.""" import urllib.request user_map = {} try: req = urllib.request.Request(f"{ARGILLA_API_URL}/api/v1/users", headers=headers) with urllib.request.urlopen(req, timeout=30) as resp: users_data = json.loads(resp.read().decode()) # Handle both list and dict (with "items" key) response formats if isinstance(users_data, dict): users_list = users_data.get("items", []) else: users_list = users_data for user in users_list: user_map[user["id"]] = user["username"] except Exception as e: print(f"Warning: Could not fetch users: {e}", flush=True) return user_map def export_annotations(): """Export all annotations from Argilla using REST API.""" import urllib.request import urllib.error try: headers = {"X-Argilla-Api-Key": ARGILLA_API_KEY} # Build user_id -> username map for attribution user_map = get_user_map(headers) print(f"Loaded {len(user_map)} users for attribution", flush=True) # Get list of datasets req = urllib.request.Request(f"{ARGILLA_API_URL}/api/v1/me/datasets", headers=headers) with urllib.request.urlopen(req, timeout=30) as resp: datasets_data = json.loads(resp.read().decode()) datasets = datasets_data.get("items", []) print(f"Found {len(datasets)} datasets", flush=True) for dataset in datasets: dataset_id = dataset["id"] dataset_name = dataset["name"] # Get records with responses for this dataset req = urllib.request.Request( f"{ARGILLA_API_URL}/api/v1/datasets/{dataset_id}/records?include=responses&limit=1000", headers=headers ) with urllib.request.urlopen(req, timeout=60) as resp: records_data = json.loads(resp.read().decode()) records = records_data.get("items", []) # Filter to records with responses (annotations) annotated = [r for r in records if r.get("responses")] # Add username to each response for attribution preservation for record in annotated: for response in record.get("responses", []): user_id = response.get("user_id") if user_id and user_id in user_map: response["username"] = user_map[user_id] if annotated: count = len(annotated) # Only export if annotation count changed if count != last_annotation_counts.get(dataset_name): export_file = EXPORT_DIR / f"{dataset_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with scheduler.lock: with open(export_file, "w") as f: json.dump({ "dataset": dataset_name, "exported_at": datetime.now().isoformat(), "num_annotations": count, "records": annotated }, f, indent=2, default=str) last_annotation_counts[dataset_name] = count print(f"Exported {count} annotations from {dataset_name}", flush=True) else: print(f"No new annotations in {dataset_name} (still {count}), skipping export", flush=True) else: print(f"No annotations yet in {dataset_name}", flush=True) except Exception as e: print(f"Export error: {e}", flush=True) import traceback traceback.print_exc() def main(): print("Starting Argilla backup service...", flush=True) print(f" Backup repo: {BACKUP_REPO}", flush=True) print(f" Export dir: {EXPORT_DIR}", flush=True) print(f" Sync interval: {SYNC_INTERVAL} minutes", flush=True) if not wait_for_argilla(): print("Warning: Argilla not responding, will retry exports anyway", flush=True) print("Starting backup loop...", flush=True) while True: export_annotations() print(f"Sleeping for {SYNC_INTERVAL} minutes until next export...", flush=True) time.sleep(SYNC_INTERVAL * 60) # Convert to seconds if __name__ == "__main__": main()