| from flask_sqlalchemy import SQLAlchemy |
| from flask_login import UserMixin |
| from datetime import datetime |
| import math |
| from sqlalchemy import func |
|
|
| db = SQLAlchemy() |
|
|
|
|
| class User(db.Model, UserMixin): |
| id = db.Column(db.Integer, primary_key=True) |
| username = db.Column(db.String(100), unique=True, nullable=False) |
| hf_id = db.Column(db.String(100), unique=True, nullable=False) |
| join_date = db.Column(db.DateTime, default=datetime.utcnow) |
| votes = db.relationship("Vote", backref="user", lazy=True) |
| show_in_leaderboard = db.Column(db.Boolean, default=True) |
|
|
| def __repr__(self): |
| return f"<User {self.username}>" |
|
|
|
|
| class ModelType: |
| TTS = "tts" |
| CONVERSATIONAL = "conversational" |
|
|
|
|
| class Model(db.Model): |
| id = db.Column(db.String(100), primary_key=True) |
| name = db.Column(db.String(100), nullable=False) |
| model_type = db.Column(db.String(20), nullable=False) |
| |
| votes = db.relationship( |
| "Vote", |
| primaryjoin="or_(Model.id==Vote.model_chosen, Model.id==Vote.model_rejected)", |
| viewonly=True, |
| ) |
| current_elo = db.Column(db.Float, default=1500.0) |
| win_count = db.Column(db.Integer, default=0) |
| match_count = db.Column(db.Integer, default=0) |
| is_open = db.Column(db.Boolean, default=False) |
| is_active = db.Column( |
| db.Boolean, default=True |
| ) |
| model_url = db.Column(db.String(255), nullable=True) |
|
|
| @property |
| def win_rate(self): |
| if self.match_count == 0: |
| return 0 |
| return (self.win_count / self.match_count) * 100 |
|
|
| def __repr__(self): |
| return f"<Model {self.name} ({self.model_type})>" |
|
|
|
|
| class Vote(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) |
| text = db.Column(db.String(1000), nullable=False) |
| vote_date = db.Column(db.DateTime, default=datetime.utcnow) |
| model_chosen = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False) |
| model_rejected = db.Column( |
| db.String(100), db.ForeignKey("model.id"), nullable=False |
| ) |
| model_type = db.Column(db.String(20), nullable=False) |
|
|
| chosen = db.relationship( |
| "Model", |
| foreign_keys=[model_chosen], |
| backref=db.backref("chosen_votes", lazy=True), |
| ) |
| rejected = db.relationship( |
| "Model", |
| foreign_keys=[model_rejected], |
| backref=db.backref("rejected_votes", lazy=True), |
| ) |
|
|
| def __repr__(self): |
| return f"<Vote {self.id}: {self.model_chosen} over {self.model_rejected} ({self.model_type})>" |
|
|
|
|
| class EloHistory(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| model_id = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False) |
| timestamp = db.Column(db.DateTime, default=datetime.utcnow) |
| elo_score = db.Column(db.Float, nullable=False) |
| vote_id = db.Column(db.Integer, db.ForeignKey("vote.id"), nullable=True) |
| model_type = db.Column(db.String(20), nullable=False) |
|
|
| model = db.relationship("Model", backref=db.backref("elo_history", lazy=True)) |
| vote = db.relationship("Vote", backref=db.backref("elo_changes", lazy=True)) |
|
|
| def __repr__(self): |
| return f"<EloHistory {self.model_id}: {self.elo_score} at {self.timestamp} ({self.model_type})>" |
|
|
|
|
| def calculate_elo_change(winner_elo, loser_elo, k_factor=32): |
| """Calculate Elo rating changes for a match.""" |
| expected_winner = 1 / (1 + math.pow(10, (loser_elo - winner_elo) / 400)) |
| expected_loser = 1 / (1 + math.pow(10, (winner_elo - loser_elo) / 400)) |
|
|
| winner_new_elo = winner_elo + k_factor * (1 - expected_winner) |
| loser_new_elo = loser_elo + k_factor * (0 - expected_loser) |
|
|
| return winner_new_elo, loser_new_elo |
|
|
|
|
| def record_vote(user_id, text, chosen_model_id, rejected_model_id, model_type): |
| """Record a vote and update Elo ratings.""" |
| |
| vote = Vote( |
| user_id=user_id, |
| text=text, |
| model_chosen=chosen_model_id, |
| model_rejected=rejected_model_id, |
| model_type=model_type, |
| ) |
| db.session.add(vote) |
| db.session.flush() |
|
|
| |
| chosen_model = Model.query.filter_by( |
| id=chosen_model_id, model_type=model_type |
| ).first() |
| rejected_model = Model.query.filter_by( |
| id=rejected_model_id, model_type=model_type |
| ).first() |
|
|
| if not chosen_model or not rejected_model: |
| db.session.rollback() |
| return None, "One or both models not found for the specified model type" |
|
|
| |
| new_chosen_elo, new_rejected_elo = calculate_elo_change( |
| chosen_model.current_elo, rejected_model.current_elo |
| ) |
|
|
| |
| chosen_model.current_elo = new_chosen_elo |
| chosen_model.win_count += 1 |
| chosen_model.match_count += 1 |
|
|
| rejected_model.current_elo = new_rejected_elo |
| rejected_model.match_count += 1 |
|
|
| |
| chosen_history = EloHistory( |
| model_id=chosen_model_id, |
| elo_score=new_chosen_elo, |
| vote_id=vote.id, |
| model_type=model_type, |
| ) |
|
|
| rejected_history = EloHistory( |
| model_id=rejected_model_id, |
| elo_score=new_rejected_elo, |
| vote_id=vote.id, |
| model_type=model_type, |
| ) |
|
|
| db.session.add_all([chosen_history, rejected_history]) |
| db.session.commit() |
|
|
| return vote, None |
|
|
|
|
| def get_leaderboard_data(model_type): |
| """ |
| Get leaderboard data for the specified model type. |
| |
| Args: |
| model_type (str): The model type ('tts' or 'conversational') |
| |
| Returns: |
| list: List of dictionaries containing model data for the leaderboard |
| """ |
| query = Model.query.filter_by(model_type=model_type) |
|
|
| |
| models = query.order_by(Model.current_elo.desc()).all() |
|
|
| result = [] |
| for rank, model in enumerate(models, 1): |
| |
| if rank <= 2: |
| tier = "tier-s" |
| elif rank <= 4: |
| tier = "tier-a" |
| elif rank <= 7: |
| tier = "tier-b" |
| else: |
| tier = "" |
|
|
| result.append( |
| { |
| "rank": rank, |
| "id": model.id, |
| "name": model.name, |
| "model_url": model.model_url, |
| "win_rate": f"{model.win_rate:.0f}%", |
| "total_votes": model.match_count, |
| "elo": int(model.current_elo), |
| "tier": tier, |
| "is_open": model.is_open, |
| } |
| ) |
|
|
| return result |
|
|
|
|
| def get_user_leaderboard(user_id, model_type): |
| """ |
| Get personalized leaderboard data for a specific user. |
| |
| Args: |
| user_id (int): The user ID |
| model_type (str): The model type ('tts' or 'conversational') |
| |
| Returns: |
| list: List of dictionaries containing model data for the user's personal leaderboard |
| """ |
| |
| models = Model.query.filter_by(model_type=model_type).all() |
|
|
| |
| user_votes = Vote.query.filter_by(user_id=user_id, model_type=model_type).all() |
|
|
| |
| model_stats = {model.id: {"wins": 0, "matches": 0} for model in models} |
|
|
| for vote in user_votes: |
| model_stats[vote.model_chosen]["wins"] += 1 |
| model_stats[vote.model_chosen]["matches"] += 1 |
| model_stats[vote.model_rejected]["matches"] += 1 |
|
|
| |
| result = [] |
| for model in models: |
| stats = model_stats[model.id] |
| win_rate = ( |
| (stats["wins"] / stats["matches"] * 100) if stats["matches"] > 0 else 0 |
| ) |
|
|
| |
| if stats["matches"] > 0: |
| result.append( |
| { |
| "id": model.id, |
| "name": model.name, |
| "model_url": model.model_url, |
| "win_rate": f"{win_rate:.0f}%", |
| "total_votes": stats["matches"], |
| "wins": stats["wins"], |
| "is_open": model.is_open, |
| } |
| ) |
|
|
| |
| result.sort(key=lambda x: float(x["win_rate"].rstrip("%")), reverse=True) |
|
|
| |
| for i, item in enumerate(result, 1): |
| item["rank"] = i |
|
|
| return result |
|
|
|
|
| def get_historical_leaderboard_data(model_type, target_date=None): |
| """ |
| Get leaderboard data at a specific date in history. |
| |
| Args: |
| model_type (str): The model type ('tts' or 'conversational') |
| target_date (datetime): The target date for historical data, defaults to current time |
| |
| Returns: |
| list: List of dictionaries containing model data for the historical leaderboard |
| """ |
| if not target_date: |
| target_date = datetime.utcnow() |
|
|
| |
| models = Model.query.filter_by(model_type=model_type).all() |
|
|
| |
| result = [] |
|
|
| for model in models: |
| |
| elo_entry = ( |
| EloHistory.query.filter( |
| EloHistory.model_id == model.id, |
| EloHistory.model_type == model_type, |
| EloHistory.timestamp <= target_date, |
| ) |
| .order_by(EloHistory.timestamp.desc()) |
| .first() |
| ) |
|
|
| |
| if not elo_entry: |
| continue |
|
|
| |
| match_count = Vote.query.filter( |
| db.or_(Vote.model_chosen == model.id, Vote.model_rejected == model.id), |
| Vote.model_type == model_type, |
| Vote.vote_date <= target_date, |
| ).count() |
|
|
| win_count = Vote.query.filter( |
| Vote.model_chosen == model.id, |
| Vote.model_type == model_type, |
| Vote.vote_date <= target_date, |
| ).count() |
|
|
| |
| win_rate = (win_count / match_count * 100) if match_count > 0 else 0 |
|
|
| |
| result.append( |
| { |
| "id": model.id, |
| "name": model.name, |
| "model_url": model.model_url, |
| "win_rate": f"{win_rate:.0f}%", |
| "total_votes": match_count, |
| "elo": int(elo_entry.elo_score), |
| "is_open": model.is_open, |
| } |
| ) |
|
|
| |
| result.sort(key=lambda x: x["elo"], reverse=True) |
|
|
| |
| for i, item in enumerate(result, 1): |
| item["rank"] = i |
| |
| if i <= 2: |
| item["tier"] = "tier-s" |
| elif i <= 4: |
| item["tier"] = "tier-a" |
| elif i <= 7: |
| item["tier"] = "tier-b" |
| else: |
| item["tier"] = "" |
|
|
| return result |
|
|
|
|
| def get_key_historical_dates(model_type): |
| """ |
| Get a list of key dates in the leaderboard history. |
| |
| Args: |
| model_type (str): The model type ('tts' or 'conversational') |
| |
| Returns: |
| list: List of datetime objects representing key dates |
| """ |
| |
| first_vote = ( |
| Vote.query.filter_by(model_type=model_type) |
| .order_by(Vote.vote_date.asc()) |
| .first() |
| ) |
| last_vote = ( |
| Vote.query.filter_by(model_type=model_type) |
| .order_by(Vote.vote_date.desc()) |
| .first() |
| ) |
|
|
| if not first_vote or not last_vote: |
| return [] |
|
|
| |
| dates = [] |
| current_date = first_vote.vote_date.replace(day=1) |
| end_date = last_vote.vote_date |
|
|
| while current_date <= end_date: |
| dates.append(current_date) |
| |
| if current_date.month == 12: |
| current_date = current_date.replace(year=current_date.year + 1, month=1) |
| else: |
| current_date = current_date.replace(month=current_date.month + 1) |
|
|
| |
| if dates and dates[-1].month != end_date.month or dates[-1].year != end_date.year: |
| dates.append(end_date) |
|
|
| return dates |
|
|
|
|
| def insert_initial_models(): |
| """Insert initial models into the database.""" |
| tts_models = [ |
| Model( |
| id="index-tts", |
| name="Index TTS", |
| model_type=ModelType.TTS, |
| is_open=True, |
| model_url="https://github.com/Index-Research/index-tts", |
| ), |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| Model( |
| id="spark-tts", |
| name="Spark TTS", |
| model_type=ModelType.TTS, |
| is_open=False, |
| is_active=False, |
| model_url="https://github.com/SparkAudio/Spark-TTS", |
| ), |
| Model( |
| id="maskgct", |
| name="maskgct", |
| model_type=ModelType.TTS, |
| is_open=False, |
| is_active=True, |
| model_url="https://github.com/open-mmlab/Amphion/tree/main/models/tts/maskgct", |
| ), |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| Model( |
| id="cosyvoice-2.0", |
| name="CosyVoice 2.0", |
| model_type=ModelType.TTS, |
| is_open=True, |
| model_url="https://github.com/FunAudioLLM/CosyVoice", |
| ), |
| Model( |
| id="gpt-sovits-v2", |
| name="GPT-SoVITS v2", |
| model_type=ModelType.TTS, |
| is_open=True, |
| model_url="https://huggingface.co/spaces/lj1995/GPT-SoVITS-v2", |
| ), |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| ] |
| conversational_models = [ |
| Model( |
| id="csm-1b", |
| name="CSM 1B", |
| model_type=ModelType.CONVERSATIONAL, |
| is_open=True, |
| model_url="https://huggingface.co/sesame/csm-1b", |
| ), |
| Model( |
| id="playdialog-1.0", |
| name="PlayDialog 1.0", |
| model_type=ModelType.CONVERSATIONAL, |
| is_open=False, |
| model_url="https://play.ht/", |
| ), |
| Model( |
| id="dia-1.6b", |
| name="Dia 1.6B", |
| model_type=ModelType.CONVERSATIONAL, |
| is_open=True, |
| model_url="https://huggingface.co/nari-labs/Dia-1.6B", |
| ), |
| ] |
|
|
| all_models = tts_models + conversational_models |
|
|
| for model in all_models: |
| existing = Model.query.filter_by( |
| id=model.id, model_type=model.model_type |
| ).first() |
| if not existing: |
| db.session.add(model) |
| else: |
| |
| existing.name = model.name |
| existing.is_open = model.is_open |
| if model.is_active is not None: |
| existing.is_active = model.is_active |
|
|
| db.session.commit() |
|
|
|
|
| def get_top_voters(limit=10): |
| """ |
| Get the top voters by number of votes. |
| |
| Args: |
| limit (int): Number of users to return |
| |
| Returns: |
| list: List of dictionaries containing user data and vote counts |
| """ |
| |
| top_users = db.session.query( |
| User, func.count(Vote.id).label('vote_count') |
| ).join(Vote).filter( |
| User.show_in_leaderboard == True |
| ).group_by(User.id).order_by( |
| func.count(Vote.id).desc() |
| ).limit(limit).all() |
| |
| result = [] |
| for i, (user, vote_count) in enumerate(top_users, 1): |
| result.append({ |
| "rank": i, |
| "username": user.username, |
| "vote_count": vote_count, |
| "join_date": user.join_date.strftime("%b %d, %Y") |
| }) |
| |
| return result |
|
|
|
|
| def toggle_user_leaderboard_visibility(user_id): |
| """ |
| Toggle whether a user appears in the voters leaderboard |
| |
| Args: |
| user_id (int): The user ID |
| |
| Returns: |
| bool: New visibility state |
| """ |
| user = User.query.get(user_id) |
| if not user: |
| return None |
| |
| user.show_in_leaderboard = not user.show_in_leaderboard |
| db.session.commit() |
| |
| return user.show_in_leaderboard |
|
|