| import glob |
| import io |
| import json |
| import os |
| import time |
| import uuid |
| import shutil |
| import os |
| import base64 |
| from dataclasses import dataclass |
| from typing import List, Dict, Any |
| from collections import defaultdict |
|
|
| import tenacity |
| import pandas as pd |
| from huggingface_hub import HfApi, hf_hub_download, snapshot_download |
| from loguru import logger |
|
|
| from competitions.enums import SubmissionStatus, ErrorMessage |
| from competitions.info import CompetitionInfo |
| from competitions.utils import user_token_api, space_cleaner, dockerfile_modifier, server_manager |
|
|
| IS_AUTO_PAUSE_HUGSIM_SERVER = os.getenv("IS_AUTO_PAUSE_HUGSIM_SERVER", "true").lower() == "true" |
|
|
|
|
| @dataclass |
| class JobRunner: |
| competition_id: str |
| token: str |
| output_path: str |
|
|
| def __post_init__(self): |
| self.competition_info = CompetitionInfo(competition_id=self.competition_id, autotrain_token=self.token) |
| self.competition_id = self.competition_info.competition_id |
| self.competition_type = self.competition_info.competition_type |
| self.metric = self.competition_info.metric |
| self.submission_id_col = self.competition_info.submission_id_col |
| self.submission_cols = self.competition_info.submission_cols |
| self.submission_rows = self.competition_info.submission_rows |
| self.time_limit = self.competition_info.time_limit |
| self.dataset = self.competition_info.dataset |
| self.submission_filenames = self.competition_info.submission_filenames |
|
|
| def _get_all_submissions(self) -> List[Dict[str, Any]]: |
| submission_jsons = snapshot_download( |
| repo_id=self.competition_id, |
| allow_patterns="submission_info/*.json", |
| token=self.token, |
| repo_type="dataset", |
| ) |
| submission_jsons = glob.glob(os.path.join(submission_jsons, "submission_info/*.json")) |
| all_submissions = [] |
| for _json_path in submission_jsons: |
| with open(_json_path, "r", encoding="utf-8") as f: |
| _json = json.load(f) |
| team_id = _json["id"] |
| for sub in _json["submissions"]: |
| all_submissions.append( |
| { |
| "team_id": team_id, |
| "submission_id": sub["submission_id"], |
| "datetime": sub["datetime"], |
| "status": sub["status"], |
| "submission_repo": sub["submission_repo"], |
| "space_id": sub["space_id"], |
| "server_url": sub["server_url"], |
| "hardware": sub["hardware"], |
| } |
| ) |
| return all_submissions |
|
|
|
|
| def _get_pending_subs(self, submissions: List[Dict[str, Any]]) -> pd.DataFrame: |
| pending_submissions = [] |
| for sub in submissions: |
| if sub["status"] == SubmissionStatus.PENDING.value: |
| pending_submissions.append(sub) |
| if len(pending_submissions) == 0: |
| return None |
| logger.info(f"Found {len(pending_submissions)} pending submissions.") |
| pending_submissions = pd.DataFrame(pending_submissions) |
| pending_submissions["datetime"] = pd.to_datetime(pending_submissions["datetime"]) |
| pending_submissions = pending_submissions.sort_values("datetime") |
| pending_submissions = pending_submissions.reset_index(drop=True) |
| return pending_submissions |
|
|
| def _get_server_active_count(self, submissions: List[Dict[str, Any]]) -> Dict[str, int]: |
| server_active_count = defaultdict(int) |
| for sub in submissions: |
| if sub["status"] in {SubmissionStatus.PROCESSING.value, SubmissionStatus.QUEUED.value}: |
| server_active_count[sub["server_url"]] += 1 |
| return server_active_count |
|
|
| def _queue_submission(self, team_id, submission_id): |
| team_fname = hf_hub_download( |
| repo_id=self.competition_id, |
| filename=f"submission_info/{team_id}.json", |
| token=self.token, |
| repo_type="dataset", |
| ) |
| with open(team_fname, "r", encoding="utf-8") as f: |
| team_submission_info = json.load(f) |
|
|
| for submission in team_submission_info["submissions"]: |
| if submission["submission_id"] == submission_id: |
| submission["status"] = SubmissionStatus.QUEUED.value |
| break |
|
|
| team_submission_info_json = json.dumps(team_submission_info, indent=4) |
| team_submission_info_json_bytes = team_submission_info_json.encode("utf-8") |
| team_submission_info_json_buffer = io.BytesIO(team_submission_info_json_bytes) |
| api = HfApi(token=self.token) |
| api.upload_file( |
| path_or_fileobj=team_submission_info_json_buffer, |
| path_in_repo=f"submission_info/{team_id}.json", |
| repo_id=self.competition_id, |
| repo_type="dataset", |
| ) |
|
|
| def mark_submission_failed(self, team_id: str, submission_id: str, error_message: str): |
| team_fname = hf_hub_download( |
| repo_id=self.competition_id, |
| filename=f"submission_info/{team_id}.json", |
| token=self.token, |
| repo_type="dataset", |
| ) |
| with open(team_fname, "r", encoding="utf-8") as f: |
| team_submission_info = json.load(f) |
|
|
| for submission in team_submission_info["submissions"]: |
| if submission["submission_id"] == submission_id: |
| submission["status"] = SubmissionStatus.FAILED.value |
| submission["error_message"] = error_message |
|
|
| team_submission_info_json = json.dumps(team_submission_info, indent=4) |
| team_submission_info_json_bytes = team_submission_info_json.encode("utf-8") |
| team_submission_info_json_buffer = io.BytesIO(team_submission_info_json_bytes) |
|
|
| api = HfApi(token=self.token) |
| api.upload_file( |
| path_or_fileobj=team_submission_info_json_buffer, |
| path_in_repo=f"submission_info/{team_id}.json", |
| repo_id=self.competition_id, |
| repo_type="dataset", |
| ) |
|
|
| def _create_readme(self, project_name: str) -> str: |
| _readme = "---\n" |
| _readme += f"title: {project_name}\n" |
| _readme += "emoji: 🚀\n" |
| _readme += "colorFrom: green\n" |
| _readme += "colorTo: indigo\n" |
| _readme += "sdk: docker\n" |
| _readme += "pinned: false\n" |
| _readme += "---\n" |
| return _readme |
|
|
| def create_space(self, team_id, submission_id, submission_repo, space_id, server_url, hardware): |
| user_token = user_token_api.get(team_id) |
|
|
| api = HfApi(token=self.token) |
| params = { |
| "space_id": space_id, |
| "client_space_id": space_id, |
| "competition_id": self.competition_id, |
| "team_id": team_id, |
| "submission_id": submission_id, |
| "output_path": self.output_path, |
| "submission_repo": submission_repo, |
| "time_limit": self.time_limit, |
| "dataset": self.dataset, |
| "submission_filenames": self.submission_filenames, |
| } |
| token_info_json = json.dumps(params, indent=4) |
| token_info_json_bytes = token_info_json.encode("utf-8") |
| token_info_json_buffer = io.BytesIO(token_info_json_bytes) |
|
|
| api = HfApi(token=self.token) |
| client_token = uuid.uuid4().hex + uuid.uuid4().hex |
|
|
| api.upload_file( |
| path_or_fileobj=token_info_json_buffer, |
| path_in_repo=f"token_data_info/{client_token}.json", |
| repo_id=self.competition_id, |
| repo_type="dataset", |
| ) |
| api.create_repo( |
| repo_id=space_id, |
| repo_type="space", |
| space_sdk="docker", |
| space_hardware=hardware, |
| private=True, |
| ) |
|
|
| api.add_space_variable(repo_id=space_id, key="IN_HF_SPACE", value="true") |
| api.add_space_secret(repo_id=space_id, key="HUGSIM_API_TOKEN", value=client_token) |
| api.add_space_secret(repo_id=space_id, key="HUGSIM_SERVER_HOST", value=server_url) |
| client_code_local_dir = f"/tmp/data/client_repo/{space_id}" |
| client_commits = api.list_repo_commits(submission_repo, repo_type="model", token=user_token) |
| api.snapshot_download( |
| repo_id=submission_repo, |
| repo_type="model", |
| revision=client_commits[0].commit_id, |
| token=user_token, |
| local_dir=client_code_local_dir, |
| allow_patterns=["Dockerfile"], |
| ) |
| with open(f"{client_code_local_dir}/README.md", "w", encoding="utf-8") as f: |
| f.write(self._create_readme(space_id)) |
| shutil.copyfile("./other_files/network_filter.so", os.path.join(client_code_local_dir, "network_filter.so")) |
| for filename in os.listdir(client_code_local_dir): |
| if filename.lower() == "dockerfile": |
| filepath = os.path.join(client_code_local_dir, filename) |
| with open(filepath, "r", encoding="utf-8") as f: |
| dockerfile_content = f.read() |
| with open(filepath, "w", encoding="utf-8") as f: |
| f.write(dockerfile_modifier.modify_dockerfile_content(dockerfile_content, submission_repo, base64.b64encode(user_token.encode()).decode())[0]) |
| try: |
| api.upload_folder( |
| repo_id=space_id, |
| repo_type="space", |
| folder_path=client_code_local_dir, |
| ) |
| finally: |
| shutil.rmtree(client_code_local_dir, ignore_errors=True) |
| self._queue_submission(team_id, submission_id) |
|
|
| @tenacity.retry(stop=tenacity.stop_never, wait=tenacity.wait_fixed(15)) |
| def run(self): |
| cur = 0 |
| while True: |
| time.sleep(5) |
| if cur == 10000: |
| cur = 0 |
| cur += 1 |
| all_submissions = self._get_all_submissions() |
|
|
| |
| if cur % 100 == 1: |
| logger.info("Cleaning up spaces...") |
| ready_submissions_count = 0 |
| for space in all_submissions: |
| if space["status"] in {SubmissionStatus.QUEUED.value, SubmissionStatus.PROCESSING.value}: |
| logger.info(f"Cleaning up space {space['space_id']} for submission {space['submission_id']}") |
| try: |
| space_cleaner.clean_space( |
| space["space_id"], |
| space["team_id"], |
| space["submission_id"], |
| ) |
| except Exception as e: |
| logger.error(f"Cleaning up space {space['space_id']} failed: {str(e)}") |
| if space["status"] in {SubmissionStatus.QUEUED.value, SubmissionStatus.PROCESSING.value, SubmissionStatus.PROCESSING.value}: |
| ready_submissions_count += 1 |
| if ready_submissions_count == 0: |
| if IS_AUTO_PAUSE_HUGSIM_SERVER: |
| server_manager.pause_all_servers() |
|
|
| pending_submissions = self._get_pending_subs(all_submissions) |
| if pending_submissions is None: |
| continue |
| first_pending_sub = pending_submissions.iloc[0] |
| server_active_count = self._get_server_active_count(all_submissions) |
|
|
| if server_active_count[first_pending_sub["server_url"]] >= 1: |
| continue |
| try: |
| if IS_AUTO_PAUSE_HUGSIM_SERVER: |
| server_manager.start_all_servers() |
| self.create_space(first_pending_sub["team_id"], first_pending_sub["submission_id"], first_pending_sub["submission_repo"], first_pending_sub["space_id"], first_pending_sub["server_url"], first_pending_sub["hardware"]) |
| except Exception as e: |
| logger.error( |
| f"Failed to create space for {first_pending_sub['submission_id']}: {e}" |
| ) |
| |
| self.mark_submission_failed(first_pending_sub['team_id'], first_pending_sub['submission_id'], ErrorMessage.FAILED_TO_CREATE_SPACE.value) |
| try: |
| space_cleaner.delete_space(first_pending_sub["space_id"]) |
| except Exception as e: |
| logger.error(f"Failed to delete space {first_pending_sub['space_id']}: {e}") |
| logger.error(f"Marked submission {first_pending_sub['submission_id']} as failed.") |
| continue |
|
|