| import glob |
| import json |
| import os |
| import time |
| from dataclasses import dataclass |
| from datetime import datetime |
|
|
| import pandas as pd |
| from huggingface_hub import hf_hub_download, snapshot_download |
| from loguru import logger |
|
|
| from competitions.enums import SubmissionStatus |
|
|
|
|
| @dataclass |
| class Leaderboard: |
| end_date: datetime |
| eval_higher_is_better: bool |
| max_selected_submissions: int |
| competition_id: str |
| token: str |
| scoring_metric: str |
|
|
| def __post_init__(self): |
| self.non_score_columns = ["id", "submission_datetime"] |
|
|
| def _process_public_lb(self): |
| start_time = time.time() |
| submissions_folder = snapshot_download( |
| repo_id=self.competition_id, |
| allow_patterns="submission_info/*.json", |
| use_auth_token=self.token, |
| repo_type="dataset", |
| ) |
| logger.info(f"Downloaded submissions in {time.time() - start_time} seconds") |
| start_time = time.time() |
| submissions = [] |
| for submission in glob.glob(os.path.join(submissions_folder, "submission_info", "*.json")): |
| with open(submission, "r", encoding="utf-8") as f: |
| submission_info = json.load(f) |
| |
| submission_info["submissions"] = [ |
| sub for sub in submission_info["submissions"] if sub["status"] == SubmissionStatus.SUCCESS.value |
| ] |
| submission_info["submissions"] = [ |
| sub |
| for sub in submission_info["submissions"] |
| if datetime.strptime(sub["datetime"], "%Y-%m-%d %H:%M:%S") < self.end_date |
| ] |
| if len(submission_info["submissions"]) == 0: |
| continue |
|
|
| user_id = submission_info["id"] |
| user_submissions = [] |
| for sub in submission_info["submissions"]: |
| _sub = { |
| "id": user_id, |
| |
| |
| |
| |
| } |
| for k, v in sub["public_score"].items(): |
| _sub[k] = v |
| _sub["submission_datetime"] = sub["datetime"] |
| user_submissions.append(_sub) |
|
|
| user_submissions.sort(key=lambda x: x[self.scoring_metric], reverse=self.eval_higher_is_better) |
| best_user_submission = user_submissions[0] |
| submissions.append(best_user_submission) |
| logger.info(f"Processed submissions in {time.time() - start_time} seconds") |
| return submissions |
|
|
| def _process_private_lb(self): |
| start_time = time.time() |
| submissions_folder = snapshot_download( |
| repo_id=self.competition_id, |
| allow_patterns="submission_info/*.json", |
| use_auth_token=self.token, |
| repo_type="dataset", |
| ) |
| logger.info(f"Downloaded submissions in {time.time() - start_time} seconds") |
| start_time = time.time() |
| submissions = [] |
| for submission in glob.glob(os.path.join(submissions_folder, "submission_info", "*.json")): |
| with open(submission, "r", encoding="utf-8") as f: |
| submission_info = json.load(f) |
| submission_info["submissions"] = [ |
| sub for sub in submission_info["submissions"] if sub["status"] == SubmissionStatus.SUCCESS.value |
| ] |
| if len(submission_info["submissions"]) == 0: |
| continue |
|
|
| user_id = submission_info["id"] |
| user_submissions = [] |
| for sub in submission_info["submissions"]: |
| _sub = { |
| "id": user_id, |
| |
| |
| |
| "selected": sub["selected"], |
| } |
| for k, v in sub["public_score"].items(): |
| _sub[f"public_{k}"] = v |
| for k, v in sub["private_score"].items(): |
| _sub[f"private_{k}"] = v |
| _sub["submission_datetime"] = sub["datetime"] |
| user_submissions.append(_sub) |
|
|
| |
| selected_submissions = 0 |
| for sub in user_submissions: |
| if sub["selected"]: |
| selected_submissions += 1 |
|
|
| if selected_submissions == 0: |
| |
| user_submissions.sort( |
| key=lambda x: x[f"public_{self.scoring_metric}"], reverse=self.eval_higher_is_better |
| ) |
| |
| best_user_submission = user_submissions[0] |
|
|
| elif selected_submissions <= self.max_selected_submissions: |
| |
| user_submissions = [sub for sub in user_submissions if sub["selected"]] |
| |
| user_submissions.sort( |
| key=lambda x: x[f"private_{self.scoring_metric}"], reverse=self.eval_higher_is_better |
| ) |
| |
| best_user_submission = user_submissions[0] |
| else: |
| logger.warning( |
| f"User {user_id} has more than {self.max_selected_submissions} selected submissions. Skipping user..." |
| ) |
| continue |
|
|
| |
| best_user_submission = {k: v for k, v in best_user_submission.items() if not k.startswith("public_")} |
|
|
| |
| best_user_submission = {k.replace("private_", ""): v for k, v in best_user_submission.items()} |
|
|
| |
| best_user_submission.pop("selected") |
| submissions.append(best_user_submission) |
| logger.info(f"Processed submissions in {time.time() - start_time} seconds") |
| return submissions |
|
|
| def fetch(self, private=False): |
| if private: |
| submissions = self._process_private_lb() |
| else: |
| submissions = self._process_public_lb() |
|
|
| if len(submissions) == 0: |
| return pd.DataFrame() |
|
|
| df = pd.DataFrame(submissions) |
|
|
| |
| df["submission_datetime"] = pd.to_datetime(df["submission_datetime"], format="%Y-%m-%d %H:%M:%S") |
|
|
| |
| df = df[df["submission_datetime"] < self.end_date].reset_index(drop=True) |
|
|
| |
| |
| if self.eval_higher_is_better: |
| if private: |
| df = df.sort_values( |
| by=[self.scoring_metric, "submission_datetime"], |
| ascending=[False, True], |
| ) |
| else: |
| df = df.sort_values( |
| by=[self.scoring_metric, "submission_datetime"], |
| ascending=[False, True], |
| ) |
| else: |
| if private: |
| df = df.sort_values( |
| by=[self.scoring_metric, "submission_datetime"], |
| ascending=[True, True], |
| ) |
| else: |
| df = df.sort_values( |
| by=[self.scoring_metric, "submission_datetime"], |
| ascending=[True, True], |
| ) |
|
|
| |
| for col in df.columns: |
| if col in self.non_score_columns: |
| continue |
| df[col] = df[col].round(4) |
|
|
| |
| df = df.reset_index(drop=True) |
| df["rank"] = df.index + 1 |
|
|
| |
| df["submission_datetime"] = df["submission_datetime"].dt.strftime("%Y-%m-%d %H:%M:%S") |
|
|
| |
| columns = df.columns.tolist() |
| columns.remove("submission_datetime") |
| columns.append("submission_datetime") |
| df = df[columns] |
|
|
| |
| columns = df.columns.tolist() |
| columns.remove("rank") |
| columns = ["rank"] + columns |
| df = df[columns] |
|
|
| team_metadata = hf_hub_download( |
| repo_id=self.competition_id, |
| filename="teams.json", |
| token=self.token, |
| repo_type="dataset", |
| ) |
| with open(team_metadata, "r", encoding="utf-8") as f: |
| team_metadata = json.load(f) |
|
|
| df["id"] = df["id"].apply(lambda x: team_metadata[x]["name"]) |
|
|
| return df |
|
|