Spaces:
Build error
Build error
| import pandas as pd | |
| import numpy as np | |
| from ortools.linear_solver import pywraplp | |
| def check_nba_position_eligibility(column_name, player_positions): | |
| if 'PG' in column_name: | |
| return 'PG' in player_positions | |
| elif 'SG' in column_name: | |
| return 'SG' in player_positions | |
| elif 'SF' in column_name: | |
| return 'SF' in player_positions | |
| elif 'PF' in column_name: | |
| return 'PF' in player_positions | |
| elif 'C' in column_name: | |
| return 'C' in player_positions | |
| elif 'G' in column_name: | |
| return any(pos in ['PG', 'SG'] for pos in player_positions) | |
| elif 'F' in column_name: | |
| return any(pos in ['SF', 'PF'] for pos in player_positions) | |
| elif 'UTIL' in column_name: | |
| return True # UTIL can be any position | |
| return False | |
| def check_lol_position_eligibility(column_name, player_positions): | |
| if 'TOP' in column_name: | |
| return 'TOP' in player_positions | |
| elif 'JNG' in column_name: | |
| return 'JNG' in player_positions | |
| elif 'MID' in column_name: | |
| return 'MID' in player_positions | |
| elif 'ADC' in column_name: | |
| return 'ADC' in player_positions | |
| elif 'SUP' in column_name: | |
| return 'SUP' in player_positions | |
| elif 'Team' in column_name: | |
| return 'Team' in player_positions | |
| elif 'CPT' in column_name: | |
| return any(pos in ['TOP', 'JNG', 'MID', 'ADC', 'SUP'] for pos in player_positions) | |
| return False | |
| def check_mlb_position_eligibility(column_name, player_positions): | |
| if any(pos in column_name for pos in ['P', 'SP', 'RP']): | |
| return any(pos in ['P', 'SP', 'RP'] for pos in player_positions) | |
| elif 'C' in column_name: | |
| return 'C' in player_positions | |
| elif '1B' in column_name: | |
| return '1B' in player_positions | |
| elif '2B' in column_name: | |
| return '2B' in player_positions | |
| elif '3B' in column_name: | |
| return '3B' in player_positions | |
| elif 'SS' in column_name: | |
| return 'SS' in player_positions | |
| elif 'OF' in column_name: | |
| return 'OF' in player_positions | |
| return False | |
| def check_nfl_position_eligibility(column_name, player_positions): | |
| if 'QB' in column_name: | |
| return 'QB' in player_positions | |
| elif 'RB' in column_name: | |
| return 'RB' in player_positions | |
| elif 'WR' in column_name: | |
| return 'WR' in player_positions | |
| elif 'TE' in column_name: | |
| return 'TE' in player_positions | |
| elif 'DST' in column_name: | |
| return 'DST' in player_positions | |
| elif 'FLEX' in column_name: | |
| return any(pos in ['RB', 'WR', 'TE'] for pos in player_positions) | |
| elif 'UTIL' in column_name: | |
| return any(pos in ['RB', 'WR', 'TE'] for pos in player_positions) | |
| return False | |
| def check_golf_position_eligibility(column_name, player_positions): | |
| if 'FLEX' in column_name: | |
| return any(pos in ['G'] for pos in player_positions) | |
| return True | |
| def check_tennis_position_eligibility(column_name, player_positions): | |
| if 'FLEX' in column_name: | |
| return any(pos in ['T'] for pos in player_positions) | |
| return True | |
| def check_mma_position_eligibility(column_name, player_positions): | |
| if 'FLEX' in column_name: | |
| return any(pos in ['F'] for pos in player_positions) | |
| return True | |
| def check_nascar_position_eligibility(column_name, player_positions): | |
| if 'FLEX' in column_name: | |
| return any(pos in ['D'] for pos in player_positions) | |
| return True | |
| def check_ncaaf_position_eligibility(column_name, player_positions): | |
| if 'QB' in column_name: | |
| return 'QB' in player_positions | |
| elif 'RB' in column_name: | |
| return 'RB' in player_positions | |
| elif 'WR' in column_name: | |
| return 'WR' in player_positions | |
| elif 'FLEX' in column_name: | |
| return any(pos in ['RB', 'WR'] for pos in player_positions) | |
| elif 'SFLEX' in column_name: | |
| return any(pos in ['RB', 'WR', 'QB'] for pos in player_positions) | |
| return False | |
| def check_nhl_position_eligibility(column_name, player_positions): | |
| if 'C' in column_name: | |
| return 'C' in player_positions | |
| elif 'W' in column_name: | |
| return 'W' in player_positions | |
| elif 'D' in column_name: | |
| return 'D' in player_positions | |
| elif 'G' in column_name: | |
| return 'G' in player_positions | |
| elif 'FLEX' in column_name: | |
| return any(pos in ['C', 'W', 'D'] for pos in player_positions) | |
| elif 'UTIL' in column_name: | |
| return any(pos in ['C', 'W', 'D'] for pos in player_positions) | |
| return False | |
| def check_position_eligibility(sport, column_name, player_positions): | |
| if sport == 'NBA': | |
| return check_nba_position_eligibility(column_name, player_positions) | |
| elif sport == 'MLB': | |
| return check_mlb_position_eligibility(column_name, player_positions) | |
| elif sport == 'NFL': | |
| return check_nfl_position_eligibility(column_name, player_positions) | |
| elif sport == 'NHL': | |
| return check_nhl_position_eligibility(column_name, player_positions) | |
| elif sport == 'MMA': | |
| return check_mma_position_eligibility(column_name, player_positions) | |
| elif sport == 'GOLF': | |
| return check_golf_position_eligibility(column_name, player_positions) | |
| elif sport == 'TENNIS': | |
| return check_tennis_position_eligibility(column_name, player_positions) | |
| elif sport == 'LOL': | |
| return check_lol_position_eligibility(column_name, player_positions) | |
| elif sport == 'NASCAR': | |
| return check_nascar_position_eligibility(column_name, player_positions) | |
| else: | |
| # Default fallback - assume exact position match | |
| return column_name in player_positions | |
| def get_effective_salary(player_name: str, column_name: str, map_dict: dict, type_var: str) -> float: | |
| """Calculate the effective salary for a player in a specific column (handles CPT multiplier)""" | |
| base_salary = map_dict['salary_map'].get(player_name, 0) | |
| if type_var != 'Classic' and column_name == 'CPT': | |
| return base_salary * 1.5 | |
| return base_salary | |
| def calculate_lineup_objective( | |
| row: pd.Series, | |
| player_columns: list, | |
| player_pool: pd.DataFrame, | |
| optimize_by: str, | |
| ) -> float: | |
| """Calculate the total objective value of a lineup.""" | |
| total = 0.0 | |
| for col in player_columns: | |
| player_name = row[col] | |
| player_data = player_pool[player_pool['player_names'] == player_name] | |
| if not player_data.empty: | |
| total += player_data.iloc[0].get(optimize_by, player_data.iloc[0].get('median', 0)) | |
| return total | |
| def _metric_column(optimize_by: str) -> str: | |
| """Map UI optimize_by to player_pool column name.""" | |
| if optimize_by in ("Own", "Ownership"): | |
| return "ownership" | |
| return optimize_by | |
| def _lineup_key(row: pd.Series, player_columns: list) -> frozenset: | |
| return frozenset(row[col] for col in player_columns) | |
| def _one_swap_below( | |
| row: pd.Series, | |
| previous_row: pd.Series, | |
| player_columns: list, | |
| player_pool: pd.DataFrame, | |
| map_dict: dict, | |
| lock_teams: list, | |
| type_var: str, | |
| sport_var: str, | |
| salary_max: int, | |
| optimize_by: str, | |
| used_lineup_keys: set, | |
| ) -> tuple[pd.Series, float]: | |
| """ | |
| Greedy one-swap: from previous_row, find the single swap (one column) that yields | |
| a lineup just below in the optimize metric, respects salary cap, position, locks, opponents. | |
| Locked players stay in their starting column (we never swap them), so e.g. a locked PF | |
| cannot move to F or UTIL. Prefer the swap with smallest drop. Skip lineups already in used_lineup_keys. | |
| """ | |
| lock_teams = lock_teams or [] | |
| metric_col = _metric_column(optimize_by) | |
| opp_map = map_dict.get("opp_map") or {} | |
| current_players = {row[col] for col in player_columns} | |
| total_salary = sum( | |
| get_effective_salary(row[col], col, map_dict, type_var) | |
| for col in player_columns | |
| ) | |
| # Locked players (by team): never swap these columns so they stay in their starting slot | |
| locked = { | |
| row[col] | |
| for col in player_columns | |
| if map_dict["team_map"].get(row[col], "") in lock_teams | |
| } | |
| pool_no_locked = player_pool[~player_pool["team"].isin(lock_teams)].copy() | |
| pool_no_locked = pool_no_locked.drop_duplicates(subset=["player_names"], keep="first") | |
| options = [] | |
| for col in player_columns: | |
| p_cur = row[col] | |
| if p_cur in locked: | |
| continue | |
| cur_sal = get_effective_salary(p_cur, col, map_dict, type_var) | |
| cur_rec = player_pool[player_pool["player_names"] == p_cur] | |
| m_cur = cur_rec.iloc[0].get(metric_col, cur_rec.iloc[0].get("median", 0)) if not cur_rec.empty else 0 | |
| max_replacement_salary = salary_max - total_salary + cur_sal | |
| others = current_players - {p_cur} | |
| locked_opponents = {opp_map.get(p) for p in locked if opp_map.get(p)} | |
| for p in others: | |
| if opp_map.get(p): | |
| locked_opponents.add(opp_map.get(p)) | |
| candidates = pool_no_locked[ | |
| (~pool_no_locked["player_names"].isin(others)) | |
| & (pool_no_locked["player_names"] != p_cur) | |
| ].copy() | |
| if type_var == "Classic": | |
| mask = candidates["position"].apply( | |
| lambda p: check_position_eligibility(sport_var, col, p.split("/") if isinstance(p, str) else [p]) | |
| ) | |
| candidates = candidates[mask] | |
| best_cand = None | |
| best_drop = None | |
| for _, cand in candidates.iterrows(): | |
| cname = cand["player_names"] | |
| eff_sal = get_effective_salary(cname, col, map_dict, type_var) | |
| if eff_sal > max_replacement_salary: | |
| continue | |
| if cname in locked_opponents: | |
| continue | |
| if opp_map.get(cname) and opp_map.get(cname) in others: | |
| continue | |
| m_cand = cand.get(metric_col, cand.get("median", 0)) | |
| if m_cand >= m_cur: | |
| continue | |
| drop = m_cur - m_cand | |
| if best_drop is None or drop < best_drop: | |
| best_drop = drop | |
| best_cand = (col, cname) | |
| if best_cand is not None: | |
| options.append((best_drop, best_cand[0], best_cand[1])) | |
| options.sort(key=lambda x: x[0]) | |
| for _drop, col, new_player in options: | |
| new_row = row.copy() | |
| new_row[col] = new_player | |
| key = _lineup_key(new_row, player_columns) | |
| if key in used_lineup_keys: | |
| continue | |
| obj = calculate_lineup_objective(new_row, player_columns, player_pool, metric_col) | |
| return new_row, obj | |
| obj = calculate_lineup_objective(row, player_columns, player_pool, metric_col) | |
| return row.copy(), obj | |
| def optimize_single_lineup( | |
| row: pd.Series, | |
| player_columns: list, | |
| player_pool: pd.DataFrame, | |
| map_dict: dict, | |
| lock_teams: list, | |
| type_var: str, | |
| sport_var: str, | |
| salary_max: int, | |
| optimize_by: str, | |
| ) -> tuple[pd.Series, float]: # NOW RETURNS (row, objective_value) | |
| """ | |
| Optimize a single lineup row using linear programming. | |
| Players from lock_teams are kept (locked) in the column they started in; | |
| they are never moved to other eligible positions (e.g. locked PF stays in PF, not F or UTIL; | |
| PF/C stays in PF, not C, F, or UTIL). All other positions are cleared and re-optimized | |
| using OR-Tools linear solver. | |
| Args: | |
| row: A single lineup row from the DataFrame | |
| player_columns: List of column names containing player positions | |
| player_pool: DataFrame of available players (projections_df) | |
| map_dict: Dictionary containing player mappings | |
| lock_teams: List of team names whose players should be KEPT (locked) in their starting column | |
| type_var: 'Classic' or 'Showdown' | |
| sport_var: Sport identifier (NFL, NBA, MLB, etc.) | |
| salary_max: Maximum salary cap for the lineup | |
| optimize_by: 'median' or 'ownership' - which metric to optimize for | |
| Returns: | |
| Tuple of (optimized_row, achieved_objective_value) | |
| """ | |
| # Calculate the original lineup's objective value BEFORE any changes | |
| original_objective = calculate_lineup_objective(row, player_columns, player_pool, optimize_by) | |
| # Create a copy of the row to modify | |
| optimized_row = row.copy() | |
| # Identify locked players (from lock_teams) and open positions. | |
| # Locked players must stay in the column they started in; they cannot be moved to | |
| # other eligible positions (e.g. a locked PF stays in PF, not F or UTIL; PF/C stays in PF, not C/F/UTIL). | |
| locked_players = {} # {column: player_name} | |
| open_columns = [] | |
| locked_salary = 0 | |
| locked_player_names = set() | |
| locked_objective_value = 0 # Track locked player contribution to objective | |
| for col in player_columns: | |
| player_name = row[col] | |
| player_team = map_dict['team_map'].get(player_name, '') | |
| if player_team in lock_teams: | |
| # Keep this player locked in this column only (do not move to F, UTIL, or other eligible slots) | |
| locked_players[col] = player_name | |
| locked_salary += get_effective_salary(player_name, col, map_dict, type_var) | |
| locked_player_names.add(player_name) | |
| # Add locked player's contribution to objective | |
| player_data = player_pool[player_pool['player_names'] == player_name] | |
| if not player_data.empty: | |
| locked_objective_value += player_data.iloc[0].get(optimize_by, player_data.iloc[0].get('median', 0)) | |
| else: | |
| # This position is open for optimization | |
| open_columns.append(col) | |
| # If no open columns, return with original objective value (keep original intact) | |
| if not open_columns: | |
| return row.copy(), original_objective | |
| # Calculate remaining salary budget | |
| remaining_salary = salary_max - locked_salary | |
| # Calculate remaining objective budget (if max_objective_value is set) | |
| remaining_objective_budget = None | |
| # Filter player pool: exclude locked teams and already-locked players | |
| available_players = player_pool[ | |
| (~player_pool['team'].isin(lock_teams)) & | |
| (~player_pool['player_names'].isin(locked_player_names)) | |
| ].copy() | |
| # CRITICAL: Remove duplicate players from available pool | |
| available_players = available_players.drop_duplicates(subset=['player_names'], keep='first') | |
| if available_players.empty: | |
| # No available players - keep original lineup | |
| return row.copy(), original_objective | |
| # Build the optimization model | |
| solver = pywraplp.Solver.CreateSolver('CBC') | |
| if not solver: | |
| # Fallback if solver not available - keep original lineup | |
| return row.copy(), original_objective | |
| # Create decision variables: x[player_idx, col_idx] = 1 if player is assigned to column | |
| player_list = available_players.to_dict('records') | |
| num_players = len(player_list) | |
| num_open_cols = len(open_columns) | |
| # x[i][j] = 1 if player i is assigned to open column j | |
| x = {} | |
| for i in range(num_players): | |
| for j in range(num_open_cols): | |
| x[i, j] = solver.BoolVar(f'x_{i}_{j}') | |
| # Constraint 1: Each open column gets exactly one player | |
| for j in range(num_open_cols): | |
| solver.Add(sum(x[i, j] for i in range(num_players)) == 1) | |
| # Constraint 2: Each player can only be used AT MOST once across all open columns | |
| for i in range(num_players): | |
| solver.Add(sum(x[i, j] for j in range(num_open_cols)) <= 1) | |
| # Constraint 3 & Opponent Exclusion: Handle locked players and opponents | |
| opp_map = map_dict.get('opp_map', {}) | |
| if opp_map: | |
| # When opp_map exists, use opponent-based constraints | |
| # Build a mapping of locked opponents that cannot be selected | |
| locked_opponents = set() | |
| for locked_player in locked_player_names: | |
| opponent = opp_map.get(locked_player) | |
| if opponent: | |
| locked_opponents.add(opponent) | |
| # Prevent locked opponents from being selected | |
| for i, player in enumerate(player_list): | |
| player_name = player['player_names'] | |
| if player_name in locked_opponents: | |
| for j in range(num_open_cols): | |
| solver.Add(x[i, j] == 0) | |
| # Prevent opponents from being selected together in open positions | |
| player_name_to_idx = {player['player_names']: i for i, player in enumerate(player_list)} | |
| for i, player in enumerate(player_list): | |
| player_name = player['player_names'] | |
| opponent_name = opp_map.get(player_name) | |
| if opponent_name and opponent_name in player_name_to_idx: | |
| opponent_idx = player_name_to_idx[opponent_name] | |
| # If player i is selected, opponent cannot be selected | |
| solver.Add( | |
| sum(x[i, j] for j in range(num_open_cols)) + | |
| sum(x[opponent_idx, j] for j in range(num_open_cols)) <= 1 | |
| ) | |
| else: | |
| # When opp_map doesn't exist, use standard locked player constraint | |
| # Constraint 3: Players already LOCKED in the row cannot be selected again | |
| for i, player in enumerate(player_list): | |
| player_name = player['player_names'] | |
| if player_name in locked_player_names: | |
| for j in range(num_open_cols): | |
| solver.Add(x[i, j] == 0) | |
| # Constraint 4: Position eligibility | |
| for i, player in enumerate(player_list): | |
| player_positions = player['position'].split('/') | |
| for j, col in enumerate(open_columns): | |
| if type_var == 'Classic': | |
| if not check_position_eligibility(sport_var, col, player_positions): | |
| solver.Add(x[i, j] == 0) | |
| else: | |
| # For Showdown, CPT and FLEX can take any player | |
| pass | |
| # Constraint 5: Total salary of selected players <= remaining_salary | |
| salary_constraint = [] | |
| for i, player in enumerate(player_list): | |
| for j, col in enumerate(open_columns): | |
| effective_salary = get_effective_salary(player['player_names'], col, map_dict, type_var) | |
| salary_constraint.append(x[i, j] * effective_salary) | |
| solver.Add(sum(salary_constraint) <= remaining_salary) | |
| # NEW Constraint 6: Total objective value <= max_objective_value (if specified) | |
| objective_terms = [] | |
| for i, player in enumerate(player_list): | |
| metric_value = player.get(optimize_by, player.get('median', 0)) | |
| for j in range(num_open_cols): | |
| objective_terms.append(x[i, j] * metric_value) | |
| if remaining_objective_budget is not None: | |
| solver.Add(sum(objective_terms) <= remaining_objective_budget) | |
| # Objective: Maximize the sum of the optimization metric | |
| solver.Maximize(sum(objective_terms)) | |
| # Solve | |
| status = solver.Solve() | |
| achieved_objective = locked_objective_value # Start with locked contribution | |
| if status == pywraplp.Solver.OPTIMAL or status == pywraplp.Solver.FEASIBLE: | |
| # Extract solution: only open_columns are filled; locked columns are left untouched | |
| for j, col in enumerate(open_columns): | |
| for i, player in enumerate(player_list): | |
| if x[i, j].solution_value() > 0.5: | |
| optimized_row[col] = player['player_names'] | |
| achieved_objective += player.get(optimize_by, player.get('median', 0)) | |
| break | |
| # Enforce locked players stay in their starting column (no move to F/UTIL/etc.) | |
| for col, player_name in locked_players.items(): | |
| optimized_row[col] = player_name | |
| # CRITICAL: Only return optimized lineup if it's actually better than original | |
| # If optimization resulted in a worse lineup, keep the original | |
| if achieved_objective < original_objective: | |
| # Optimization made things worse - keep the original lineup | |
| return row.copy(), original_objective | |
| return optimized_row, achieved_objective | |
| def optimize_single_lineup_cascading( | |
| row: pd.Series, | |
| player_columns: list, | |
| player_pool: pd.DataFrame, | |
| map_dict: dict, | |
| lock_teams: list, | |
| type_var: str, | |
| sport_var: str, | |
| salary_max: int, | |
| optimize_by: str, | |
| ) -> tuple[pd.Series, float]: # NOW RETURNS (row, objective_value) | |
| """ | |
| Optimize a single lineup row using linear programming (cascading budget). | |
| Players from lock_teams are kept (locked) in the column they started in; | |
| they are never moved to other eligible positions (e.g. locked PF stays in PF, not F or UTIL; | |
| PF/C stays in PF, not C, F, or UTIL). All other positions are cleared and re-optimized. | |
| Args: | |
| row: A single lineup row from the DataFrame | |
| player_columns: List of column names containing player positions | |
| player_pool: DataFrame of available players (projections_df) | |
| map_dict: Dictionary containing player mappings | |
| lock_teams: List of team names whose players should be KEPT (locked) in their starting column | |
| type_var: 'Classic' or 'Showdown' | |
| sport_var: Sport identifier (NFL, NBA, MLB, etc.) | |
| salary_max: Maximum salary cap for the lineup | |
| optimize_by: 'median' or 'ownership' - which metric to optimize for | |
| max_objective_value: Maximum allowed objective value (for cascading optimization) | |
| Returns: | |
| Tuple of (optimized_row, achieved_objective_value) | |
| """ | |
| # Calculate the original lineup's objective value BEFORE any changes | |
| max_threshold = calculate_lineup_objective(row, player_columns, player_pool, optimize_by) | |
| # Create a copy of the row to modify | |
| optimized_row = row.copy() | |
| # Identify locked players (from lock_teams) and open positions. | |
| # Locked players must stay in the column they started in (e.g. locked PF stays PF, not F/UTIL; PF/C stays PF, not C/F/UTIL). | |
| locked_players = {} # {column: player_name} | |
| open_columns = [] | |
| locked_salary = 0 | |
| locked_player_names = set() | |
| locked_objective_value = 0 # Track locked player contribution to objective | |
| for col in player_columns: | |
| player_name = row[col] | |
| player_team = map_dict['team_map'].get(player_name, '') | |
| if player_team in lock_teams: | |
| # Keep this player locked in this column only (do not move to other eligible slots) | |
| locked_players[col] = player_name | |
| locked_salary += get_effective_salary(player_name, col, map_dict, type_var) | |
| locked_player_names.add(player_name) | |
| # Add locked player's contribution to objective | |
| player_data = player_pool[player_pool['player_names'] == player_name] | |
| if not player_data.empty: | |
| locked_objective_value += player_data.iloc[0].get(optimize_by, player_data.iloc[0].get('median', 0)) | |
| else: | |
| # This position is open for optimization | |
| open_columns.append(col) | |
| # If no open columns, return with original objective value (keep original intact) | |
| if not open_columns: | |
| return row.copy(), max_threshold | |
| # Calculate remaining salary budget | |
| remaining_salary = salary_max - locked_salary | |
| # Calculate remaining objective budget (if max_objective_value is set) | |
| remaining_objective_budget = max_threshold | |
| # Filter player pool: exclude locked teams and already-locked players | |
| available_players = player_pool[ | |
| (~player_pool['team'].isin(lock_teams)) & | |
| (~player_pool['player_names'].isin(locked_player_names)) | |
| ].copy() | |
| # CRITICAL: Remove duplicate players from available pool | |
| available_players = available_players.drop_duplicates(subset=['player_names'], keep='first') | |
| if available_players.empty: | |
| # No available players - keep original lineup | |
| return row.copy(), max_threshold | |
| # Build the optimization model | |
| solver = pywraplp.Solver.CreateSolver('CBC') | |
| if not solver: | |
| # Fallback if solver not available - keep original lineup | |
| return row.copy(), max_threshold | |
| # Create decision variables: x[player_idx, col_idx] = 1 if player is assigned to column | |
| player_list = available_players.to_dict('records') | |
| num_players = len(player_list) | |
| num_open_cols = len(open_columns) | |
| # x[i][j] = 1 if player i is assigned to open column j | |
| x = {} | |
| for i in range(num_players): | |
| for j in range(num_open_cols): | |
| x[i, j] = solver.BoolVar(f'x_{i}_{j}') | |
| # Constraint 1: Each open column gets exactly one player | |
| for j in range(num_open_cols): | |
| solver.Add(sum(x[i, j] for i in range(num_players)) == 1) | |
| # Constraint 2: Each player can only be used AT MOST once across all open columns | |
| for i in range(num_players): | |
| solver.Add(sum(x[i, j] for j in range(num_open_cols)) <= 1) | |
| # Constraint 3 & Opponent Exclusion: Handle locked players and opponents | |
| opp_map = map_dict.get('opp_map', {}) | |
| if opp_map: | |
| # When opp_map exists, use opponent-based constraints | |
| # Build a mapping of locked opponents that cannot be selected | |
| locked_opponents = set() | |
| for locked_player in locked_player_names: | |
| opponent = opp_map.get(locked_player) | |
| if opponent: | |
| locked_opponents.add(opponent) | |
| # Prevent locked opponents from being selected | |
| for i, player in enumerate(player_list): | |
| player_name = player['player_names'] | |
| if player_name in locked_opponents: | |
| for j in range(num_open_cols): | |
| solver.Add(x[i, j] == 0) | |
| # Prevent opponents from being selected together in open positions | |
| player_name_to_idx = {player['player_names']: i for i, player in enumerate(player_list)} | |
| for i, player in enumerate(player_list): | |
| player_name = player['player_names'] | |
| opponent_name = opp_map.get(player_name) | |
| if opponent_name and opponent_name in player_name_to_idx: | |
| opponent_idx = player_name_to_idx[opponent_name] | |
| # If player i is selected, opponent cannot be selected | |
| solver.Add( | |
| sum(x[i, j] for j in range(num_open_cols)) + | |
| sum(x[opponent_idx, j] for j in range(num_open_cols)) <= 1 | |
| ) | |
| else: | |
| # When opp_map doesn't exist, use standard locked player constraint | |
| # Constraint 3: Players already LOCKED in the row cannot be selected again | |
| for i, player in enumerate(player_list): | |
| player_name = player['player_names'] | |
| if player_name in locked_player_names: | |
| for j in range(num_open_cols): | |
| solver.Add(x[i, j] == 0) | |
| # Constraint 4: Position eligibility | |
| for i, player in enumerate(player_list): | |
| player_positions = player['position'].split('/') | |
| for j, col in enumerate(open_columns): | |
| if type_var == 'Classic': | |
| if not check_position_eligibility(sport_var, col, player_positions): | |
| solver.Add(x[i, j] == 0) | |
| else: | |
| # For Showdown, CPT and FLEX can take any player | |
| pass | |
| # Constraint 5: Total salary of selected players <= remaining_salary | |
| salary_constraint = [] | |
| for i, player in enumerate(player_list): | |
| for j, col in enumerate(open_columns): | |
| effective_salary = get_effective_salary(player['player_names'], col, map_dict, type_var) | |
| salary_constraint.append(x[i, j] * effective_salary) | |
| solver.Add(sum(salary_constraint) <= remaining_salary) | |
| # Constraint 6: Total objective value <= max_objective_value | |
| objective_terms = [] | |
| for i, player in enumerate(player_list): | |
| metric_value = player.get(optimize_by, player.get('median', 0)) | |
| for j in range(num_open_cols): | |
| objective_terms.append(x[i, j] * metric_value) | |
| solver.Add(sum(objective_terms) <= remaining_objective_budget) | |
| # Objective: Maximize the sum of the optimization metric | |
| solver.Maximize(sum(objective_terms)) | |
| # Solve | |
| status = solver.Solve() | |
| achieved_objective = locked_objective_value # Start with locked contribution | |
| if status == pywraplp.Solver.OPTIMAL or status == pywraplp.Solver.FEASIBLE: | |
| # Extract solution: only open_columns are filled; locked columns stay fixed | |
| for j, col in enumerate(open_columns): | |
| for i, player in enumerate(player_list): | |
| if x[i, j].solution_value() > 0.5: | |
| optimized_row[col] = player['player_names'] | |
| achieved_objective += player.get(optimize_by, player.get('median', 0)) | |
| break | |
| # Enforce locked players stay in their starting column (no move to F/UTIL/etc.) | |
| for col, player_name in locked_players.items(): | |
| optimized_row[col] = player_name | |
| return optimized_row | |
| def optimize_lineups( | |
| working_frame: pd.DataFrame, | |
| projections_df: pd.DataFrame, | |
| player_columns: list, | |
| map_dict: dict, | |
| lock_teams: list, | |
| optimize_if_player_exists: list, | |
| skip_if_player_exists: list, | |
| type_var: str, | |
| sport_var: str, | |
| salary_max: int, | |
| optimize_by: str, | |
| ) -> pd.DataFrame: | |
| lock_teams = lock_teams or [] | |
| optimized_frame = working_frame.copy() | |
| player_pool = projections_df | |
| metric_col = _metric_column(optimize_by) | |
| original_frame = working_frame.copy() | |
| # Which rows to run optimization on (default: all) | |
| if optimize_if_player_exists or skip_if_player_exists: | |
| def row_has_any(row, players): | |
| return any(p in list(row) for p in players) | |
| optimize_mask = ( | |
| working_frame.apply(lambda row: row_has_any(row, optimize_if_player_exists), axis=1) | |
| if optimize_if_player_exists | |
| else pd.Series(True, index=working_frame.index) | |
| ) | |
| skip_mask = ( | |
| ~working_frame.apply(lambda row: row_has_any(row, skip_if_player_exists), axis=1) | |
| if skip_if_player_exists | |
| else pd.Series(True, index=working_frame.index) | |
| ) | |
| to_optimize_mask = optimize_mask & skip_mask | |
| else: | |
| to_optimize_mask = pd.Series(True, index=working_frame.index) | |
| subset_frame = working_frame[to_optimize_mask] | |
| original_subset_frame = original_frame[to_optimize_mask] | |
| original_objectives = [ | |
| calculate_lineup_objective(original_subset_frame.iloc[i], player_columns, player_pool, metric_col) | |
| for i in range(len(original_subset_frame)) | |
| ] | |
| # First pass: optimize every row with no cap; collect achieved objectives | |
| objectives = [] | |
| for i in range(len(subset_frame)): | |
| opt_row, achieved = optimize_single_lineup( | |
| row=subset_frame.iloc[i], | |
| player_columns=player_columns, | |
| player_pool=player_pool, | |
| map_dict=map_dict, | |
| lock_teams=lock_teams, | |
| type_var=type_var, | |
| sport_var=sport_var, | |
| salary_max=salary_max, | |
| optimize_by=optimize_by, | |
| ) | |
| subset_frame.iloc[i] = opt_row | |
| objectives.append(achieved) | |
| # Sort so row 0 is best (by objective); keep index so we can restore original order later | |
| order = sorted(range(len(subset_frame)), key=lambda i: objectives[i], reverse=True) | |
| subset_frame = subset_frame.iloc[order] # index travels with rows (no reset_index) | |
| objectives = [objectives[i] for i in order] | |
| # Second pass: greedy one-swap for any row whose lineup we've already used (duplicate) | |
| used_lineup_keys = {_lineup_key(subset_frame.iloc[0], player_columns)} | |
| for i in range(1, len(subset_frame)): | |
| prev_row = subset_frame.iloc[i - 1] | |
| curr_key = _lineup_key(subset_frame.iloc[i], player_columns) | |
| if curr_key in used_lineup_keys: | |
| new_row, new_obj = _one_swap_below( | |
| subset_frame.iloc[i], | |
| prev_row, | |
| player_columns, | |
| player_pool, | |
| map_dict, | |
| lock_teams, | |
| type_var, | |
| sport_var, | |
| salary_max, | |
| optimize_by, | |
| used_lineup_keys, | |
| ) | |
| subset_frame.iloc[i] = new_row | |
| objectives[i] = new_obj | |
| used_lineup_keys.add(_lineup_key(new_row, player_columns)) | |
| else: | |
| used_lineup_keys.add(curr_key) | |
| # Revert any row that is worse than the original lineup that ended up in this slot | |
| for i in range(len(subset_frame)): | |
| orig_obj = original_objectives[order[i]] | |
| if objectives[i] < orig_obj: | |
| subset_frame.iloc[i] = original_subset_frame.iloc[order[i]] | |
| # Restore original row order (by index) so export/display matches pre-optimization sort | |
| subset_frame = subset_frame.sort_index() | |
| optimized_frame.loc[to_optimize_mask] = subset_frame | |
| return optimized_frame |