DFS_Portfolio_Manager / global_func /optimize_lineup.py
James McCool
Okay optimize params added (I think)
8c24a32
import pandas as pd
import numpy as np
from ortools.linear_solver import pywraplp
def check_nba_position_eligibility(column_name, player_positions):
if 'PG' in column_name:
return 'PG' in player_positions
elif 'SG' in column_name:
return 'SG' in player_positions
elif 'SF' in column_name:
return 'SF' in player_positions
elif 'PF' in column_name:
return 'PF' in player_positions
elif 'C' in column_name:
return 'C' in player_positions
elif 'G' in column_name:
return any(pos in ['PG', 'SG'] for pos in player_positions)
elif 'F' in column_name:
return any(pos in ['SF', 'PF'] for pos in player_positions)
elif 'UTIL' in column_name:
return True # UTIL can be any position
return False
def check_lol_position_eligibility(column_name, player_positions):
if 'TOP' in column_name:
return 'TOP' in player_positions
elif 'JNG' in column_name:
return 'JNG' in player_positions
elif 'MID' in column_name:
return 'MID' in player_positions
elif 'ADC' in column_name:
return 'ADC' in player_positions
elif 'SUP' in column_name:
return 'SUP' in player_positions
elif 'Team' in column_name:
return 'Team' in player_positions
elif 'CPT' in column_name:
return any(pos in ['TOP', 'JNG', 'MID', 'ADC', 'SUP'] for pos in player_positions)
return False
def check_mlb_position_eligibility(column_name, player_positions):
if any(pos in column_name for pos in ['P', 'SP', 'RP']):
return any(pos in ['P', 'SP', 'RP'] for pos in player_positions)
elif 'C' in column_name:
return 'C' in player_positions
elif '1B' in column_name:
return '1B' in player_positions
elif '2B' in column_name:
return '2B' in player_positions
elif '3B' in column_name:
return '3B' in player_positions
elif 'SS' in column_name:
return 'SS' in player_positions
elif 'OF' in column_name:
return 'OF' in player_positions
return False
def check_nfl_position_eligibility(column_name, player_positions):
if 'QB' in column_name:
return 'QB' in player_positions
elif 'RB' in column_name:
return 'RB' in player_positions
elif 'WR' in column_name:
return 'WR' in player_positions
elif 'TE' in column_name:
return 'TE' in player_positions
elif 'DST' in column_name:
return 'DST' in player_positions
elif 'FLEX' in column_name:
return any(pos in ['RB', 'WR', 'TE'] for pos in player_positions)
elif 'UTIL' in column_name:
return any(pos in ['RB', 'WR', 'TE'] for pos in player_positions)
return False
def check_golf_position_eligibility(column_name, player_positions):
if 'FLEX' in column_name:
return any(pos in ['G'] for pos in player_positions)
return True
def check_tennis_position_eligibility(column_name, player_positions):
if 'FLEX' in column_name:
return any(pos in ['T'] for pos in player_positions)
return True
def check_mma_position_eligibility(column_name, player_positions):
if 'FLEX' in column_name:
return any(pos in ['F'] for pos in player_positions)
return True
def check_nascar_position_eligibility(column_name, player_positions):
if 'FLEX' in column_name:
return any(pos in ['D'] for pos in player_positions)
return True
def check_ncaaf_position_eligibility(column_name, player_positions):
if 'QB' in column_name:
return 'QB' in player_positions
elif 'RB' in column_name:
return 'RB' in player_positions
elif 'WR' in column_name:
return 'WR' in player_positions
elif 'FLEX' in column_name:
return any(pos in ['RB', 'WR'] for pos in player_positions)
elif 'SFLEX' in column_name:
return any(pos in ['RB', 'WR', 'QB'] for pos in player_positions)
return False
def check_nhl_position_eligibility(column_name, player_positions):
if 'C' in column_name:
return 'C' in player_positions
elif 'W' in column_name:
return 'W' in player_positions
elif 'D' in column_name:
return 'D' in player_positions
elif 'G' in column_name:
return 'G' in player_positions
elif 'FLEX' in column_name:
return any(pos in ['C', 'W', 'D'] for pos in player_positions)
elif 'UTIL' in column_name:
return any(pos in ['C', 'W', 'D'] for pos in player_positions)
return False
def check_position_eligibility(sport, column_name, player_positions):
if sport == 'NBA':
return check_nba_position_eligibility(column_name, player_positions)
elif sport == 'MLB':
return check_mlb_position_eligibility(column_name, player_positions)
elif sport == 'NFL':
return check_nfl_position_eligibility(column_name, player_positions)
elif sport == 'NHL':
return check_nhl_position_eligibility(column_name, player_positions)
elif sport == 'MMA':
return check_mma_position_eligibility(column_name, player_positions)
elif sport == 'GOLF':
return check_golf_position_eligibility(column_name, player_positions)
elif sport == 'TENNIS':
return check_tennis_position_eligibility(column_name, player_positions)
elif sport == 'LOL':
return check_lol_position_eligibility(column_name, player_positions)
elif sport == 'NASCAR':
return check_nascar_position_eligibility(column_name, player_positions)
else:
# Default fallback - assume exact position match
return column_name in player_positions
def get_effective_salary(player_name: str, column_name: str, map_dict: dict, type_var: str) -> float:
"""Calculate the effective salary for a player in a specific column (handles CPT multiplier)"""
base_salary = map_dict['salary_map'].get(player_name, 0)
if type_var != 'Classic' and column_name == 'CPT':
return base_salary * 1.5
return base_salary
def calculate_lineup_objective(
row: pd.Series,
player_columns: list,
player_pool: pd.DataFrame,
optimize_by: str,
) -> float:
"""Calculate the total objective value of a lineup."""
total = 0.0
for col in player_columns:
player_name = row[col]
player_data = player_pool[player_pool['player_names'] == player_name]
if not player_data.empty:
total += player_data.iloc[0].get(optimize_by, player_data.iloc[0].get('median', 0))
return total
def _metric_column(optimize_by: str) -> str:
"""Map UI optimize_by to player_pool column name."""
if optimize_by in ("Own", "Ownership"):
return "ownership"
return optimize_by
def _lineup_key(row: pd.Series, player_columns: list) -> frozenset:
return frozenset(row[col] for col in player_columns)
def _one_swap_below(
row: pd.Series,
previous_row: pd.Series,
player_columns: list,
player_pool: pd.DataFrame,
map_dict: dict,
lock_teams: list,
type_var: str,
sport_var: str,
salary_max: int,
optimize_by: str,
used_lineup_keys: set,
) -> tuple[pd.Series, float]:
"""
Greedy one-swap: from previous_row, find the single swap (one column) that yields
a lineup just below in the optimize metric, respects salary cap, position, locks, opponents.
Locked players stay in their starting column (we never swap them), so e.g. a locked PF
cannot move to F or UTIL. Prefer the swap with smallest drop. Skip lineups already in used_lineup_keys.
"""
lock_teams = lock_teams or []
metric_col = _metric_column(optimize_by)
opp_map = map_dict.get("opp_map") or {}
current_players = {row[col] for col in player_columns}
total_salary = sum(
get_effective_salary(row[col], col, map_dict, type_var)
for col in player_columns
)
# Locked players (by team): never swap these columns so they stay in their starting slot
locked = {
row[col]
for col in player_columns
if map_dict["team_map"].get(row[col], "") in lock_teams
}
pool_no_locked = player_pool[~player_pool["team"].isin(lock_teams)].copy()
pool_no_locked = pool_no_locked.drop_duplicates(subset=["player_names"], keep="first")
options = []
for col in player_columns:
p_cur = row[col]
if p_cur in locked:
continue
cur_sal = get_effective_salary(p_cur, col, map_dict, type_var)
cur_rec = player_pool[player_pool["player_names"] == p_cur]
m_cur = cur_rec.iloc[0].get(metric_col, cur_rec.iloc[0].get("median", 0)) if not cur_rec.empty else 0
max_replacement_salary = salary_max - total_salary + cur_sal
others = current_players - {p_cur}
locked_opponents = {opp_map.get(p) for p in locked if opp_map.get(p)}
for p in others:
if opp_map.get(p):
locked_opponents.add(opp_map.get(p))
candidates = pool_no_locked[
(~pool_no_locked["player_names"].isin(others))
& (pool_no_locked["player_names"] != p_cur)
].copy()
if type_var == "Classic":
mask = candidates["position"].apply(
lambda p: check_position_eligibility(sport_var, col, p.split("/") if isinstance(p, str) else [p])
)
candidates = candidates[mask]
best_cand = None
best_drop = None
for _, cand in candidates.iterrows():
cname = cand["player_names"]
eff_sal = get_effective_salary(cname, col, map_dict, type_var)
if eff_sal > max_replacement_salary:
continue
if cname in locked_opponents:
continue
if opp_map.get(cname) and opp_map.get(cname) in others:
continue
m_cand = cand.get(metric_col, cand.get("median", 0))
if m_cand >= m_cur:
continue
drop = m_cur - m_cand
if best_drop is None or drop < best_drop:
best_drop = drop
best_cand = (col, cname)
if best_cand is not None:
options.append((best_drop, best_cand[0], best_cand[1]))
options.sort(key=lambda x: x[0])
for _drop, col, new_player in options:
new_row = row.copy()
new_row[col] = new_player
key = _lineup_key(new_row, player_columns)
if key in used_lineup_keys:
continue
obj = calculate_lineup_objective(new_row, player_columns, player_pool, metric_col)
return new_row, obj
obj = calculate_lineup_objective(row, player_columns, player_pool, metric_col)
return row.copy(), obj
def optimize_single_lineup(
row: pd.Series,
player_columns: list,
player_pool: pd.DataFrame,
map_dict: dict,
lock_teams: list,
type_var: str,
sport_var: str,
salary_max: int,
optimize_by: str,
) -> tuple[pd.Series, float]: # NOW RETURNS (row, objective_value)
"""
Optimize a single lineup row using linear programming.
Players from lock_teams are kept (locked) in the column they started in;
they are never moved to other eligible positions (e.g. locked PF stays in PF, not F or UTIL;
PF/C stays in PF, not C, F, or UTIL). All other positions are cleared and re-optimized
using OR-Tools linear solver.
Args:
row: A single lineup row from the DataFrame
player_columns: List of column names containing player positions
player_pool: DataFrame of available players (projections_df)
map_dict: Dictionary containing player mappings
lock_teams: List of team names whose players should be KEPT (locked) in their starting column
type_var: 'Classic' or 'Showdown'
sport_var: Sport identifier (NFL, NBA, MLB, etc.)
salary_max: Maximum salary cap for the lineup
optimize_by: 'median' or 'ownership' - which metric to optimize for
Returns:
Tuple of (optimized_row, achieved_objective_value)
"""
# Calculate the original lineup's objective value BEFORE any changes
original_objective = calculate_lineup_objective(row, player_columns, player_pool, optimize_by)
# Create a copy of the row to modify
optimized_row = row.copy()
# Identify locked players (from lock_teams) and open positions.
# Locked players must stay in the column they started in; they cannot be moved to
# other eligible positions (e.g. a locked PF stays in PF, not F or UTIL; PF/C stays in PF, not C/F/UTIL).
locked_players = {} # {column: player_name}
open_columns = []
locked_salary = 0
locked_player_names = set()
locked_objective_value = 0 # Track locked player contribution to objective
for col in player_columns:
player_name = row[col]
player_team = map_dict['team_map'].get(player_name, '')
if player_team in lock_teams:
# Keep this player locked in this column only (do not move to F, UTIL, or other eligible slots)
locked_players[col] = player_name
locked_salary += get_effective_salary(player_name, col, map_dict, type_var)
locked_player_names.add(player_name)
# Add locked player's contribution to objective
player_data = player_pool[player_pool['player_names'] == player_name]
if not player_data.empty:
locked_objective_value += player_data.iloc[0].get(optimize_by, player_data.iloc[0].get('median', 0))
else:
# This position is open for optimization
open_columns.append(col)
# If no open columns, return with original objective value (keep original intact)
if not open_columns:
return row.copy(), original_objective
# Calculate remaining salary budget
remaining_salary = salary_max - locked_salary
# Calculate remaining objective budget (if max_objective_value is set)
remaining_objective_budget = None
# Filter player pool: exclude locked teams and already-locked players
available_players = player_pool[
(~player_pool['team'].isin(lock_teams)) &
(~player_pool['player_names'].isin(locked_player_names))
].copy()
# CRITICAL: Remove duplicate players from available pool
available_players = available_players.drop_duplicates(subset=['player_names'], keep='first')
if available_players.empty:
# No available players - keep original lineup
return row.copy(), original_objective
# Build the optimization model
solver = pywraplp.Solver.CreateSolver('CBC')
if not solver:
# Fallback if solver not available - keep original lineup
return row.copy(), original_objective
# Create decision variables: x[player_idx, col_idx] = 1 if player is assigned to column
player_list = available_players.to_dict('records')
num_players = len(player_list)
num_open_cols = len(open_columns)
# x[i][j] = 1 if player i is assigned to open column j
x = {}
for i in range(num_players):
for j in range(num_open_cols):
x[i, j] = solver.BoolVar(f'x_{i}_{j}')
# Constraint 1: Each open column gets exactly one player
for j in range(num_open_cols):
solver.Add(sum(x[i, j] for i in range(num_players)) == 1)
# Constraint 2: Each player can only be used AT MOST once across all open columns
for i in range(num_players):
solver.Add(sum(x[i, j] for j in range(num_open_cols)) <= 1)
# Constraint 3 & Opponent Exclusion: Handle locked players and opponents
opp_map = map_dict.get('opp_map', {})
if opp_map:
# When opp_map exists, use opponent-based constraints
# Build a mapping of locked opponents that cannot be selected
locked_opponents = set()
for locked_player in locked_player_names:
opponent = opp_map.get(locked_player)
if opponent:
locked_opponents.add(opponent)
# Prevent locked opponents from being selected
for i, player in enumerate(player_list):
player_name = player['player_names']
if player_name in locked_opponents:
for j in range(num_open_cols):
solver.Add(x[i, j] == 0)
# Prevent opponents from being selected together in open positions
player_name_to_idx = {player['player_names']: i for i, player in enumerate(player_list)}
for i, player in enumerate(player_list):
player_name = player['player_names']
opponent_name = opp_map.get(player_name)
if opponent_name and opponent_name in player_name_to_idx:
opponent_idx = player_name_to_idx[opponent_name]
# If player i is selected, opponent cannot be selected
solver.Add(
sum(x[i, j] for j in range(num_open_cols)) +
sum(x[opponent_idx, j] for j in range(num_open_cols)) <= 1
)
else:
# When opp_map doesn't exist, use standard locked player constraint
# Constraint 3: Players already LOCKED in the row cannot be selected again
for i, player in enumerate(player_list):
player_name = player['player_names']
if player_name in locked_player_names:
for j in range(num_open_cols):
solver.Add(x[i, j] == 0)
# Constraint 4: Position eligibility
for i, player in enumerate(player_list):
player_positions = player['position'].split('/')
for j, col in enumerate(open_columns):
if type_var == 'Classic':
if not check_position_eligibility(sport_var, col, player_positions):
solver.Add(x[i, j] == 0)
else:
# For Showdown, CPT and FLEX can take any player
pass
# Constraint 5: Total salary of selected players <= remaining_salary
salary_constraint = []
for i, player in enumerate(player_list):
for j, col in enumerate(open_columns):
effective_salary = get_effective_salary(player['player_names'], col, map_dict, type_var)
salary_constraint.append(x[i, j] * effective_salary)
solver.Add(sum(salary_constraint) <= remaining_salary)
# NEW Constraint 6: Total objective value <= max_objective_value (if specified)
objective_terms = []
for i, player in enumerate(player_list):
metric_value = player.get(optimize_by, player.get('median', 0))
for j in range(num_open_cols):
objective_terms.append(x[i, j] * metric_value)
if remaining_objective_budget is not None:
solver.Add(sum(objective_terms) <= remaining_objective_budget)
# Objective: Maximize the sum of the optimization metric
solver.Maximize(sum(objective_terms))
# Solve
status = solver.Solve()
achieved_objective = locked_objective_value # Start with locked contribution
if status == pywraplp.Solver.OPTIMAL or status == pywraplp.Solver.FEASIBLE:
# Extract solution: only open_columns are filled; locked columns are left untouched
for j, col in enumerate(open_columns):
for i, player in enumerate(player_list):
if x[i, j].solution_value() > 0.5:
optimized_row[col] = player['player_names']
achieved_objective += player.get(optimize_by, player.get('median', 0))
break
# Enforce locked players stay in their starting column (no move to F/UTIL/etc.)
for col, player_name in locked_players.items():
optimized_row[col] = player_name
# CRITICAL: Only return optimized lineup if it's actually better than original
# If optimization resulted in a worse lineup, keep the original
if achieved_objective < original_objective:
# Optimization made things worse - keep the original lineup
return row.copy(), original_objective
return optimized_row, achieved_objective
def optimize_single_lineup_cascading(
row: pd.Series,
player_columns: list,
player_pool: pd.DataFrame,
map_dict: dict,
lock_teams: list,
type_var: str,
sport_var: str,
salary_max: int,
optimize_by: str,
) -> tuple[pd.Series, float]: # NOW RETURNS (row, objective_value)
"""
Optimize a single lineup row using linear programming (cascading budget).
Players from lock_teams are kept (locked) in the column they started in;
they are never moved to other eligible positions (e.g. locked PF stays in PF, not F or UTIL;
PF/C stays in PF, not C, F, or UTIL). All other positions are cleared and re-optimized.
Args:
row: A single lineup row from the DataFrame
player_columns: List of column names containing player positions
player_pool: DataFrame of available players (projections_df)
map_dict: Dictionary containing player mappings
lock_teams: List of team names whose players should be KEPT (locked) in their starting column
type_var: 'Classic' or 'Showdown'
sport_var: Sport identifier (NFL, NBA, MLB, etc.)
salary_max: Maximum salary cap for the lineup
optimize_by: 'median' or 'ownership' - which metric to optimize for
max_objective_value: Maximum allowed objective value (for cascading optimization)
Returns:
Tuple of (optimized_row, achieved_objective_value)
"""
# Calculate the original lineup's objective value BEFORE any changes
max_threshold = calculate_lineup_objective(row, player_columns, player_pool, optimize_by)
# Create a copy of the row to modify
optimized_row = row.copy()
# Identify locked players (from lock_teams) and open positions.
# Locked players must stay in the column they started in (e.g. locked PF stays PF, not F/UTIL; PF/C stays PF, not C/F/UTIL).
locked_players = {} # {column: player_name}
open_columns = []
locked_salary = 0
locked_player_names = set()
locked_objective_value = 0 # Track locked player contribution to objective
for col in player_columns:
player_name = row[col]
player_team = map_dict['team_map'].get(player_name, '')
if player_team in lock_teams:
# Keep this player locked in this column only (do not move to other eligible slots)
locked_players[col] = player_name
locked_salary += get_effective_salary(player_name, col, map_dict, type_var)
locked_player_names.add(player_name)
# Add locked player's contribution to objective
player_data = player_pool[player_pool['player_names'] == player_name]
if not player_data.empty:
locked_objective_value += player_data.iloc[0].get(optimize_by, player_data.iloc[0].get('median', 0))
else:
# This position is open for optimization
open_columns.append(col)
# If no open columns, return with original objective value (keep original intact)
if not open_columns:
return row.copy(), max_threshold
# Calculate remaining salary budget
remaining_salary = salary_max - locked_salary
# Calculate remaining objective budget (if max_objective_value is set)
remaining_objective_budget = max_threshold
# Filter player pool: exclude locked teams and already-locked players
available_players = player_pool[
(~player_pool['team'].isin(lock_teams)) &
(~player_pool['player_names'].isin(locked_player_names))
].copy()
# CRITICAL: Remove duplicate players from available pool
available_players = available_players.drop_duplicates(subset=['player_names'], keep='first')
if available_players.empty:
# No available players - keep original lineup
return row.copy(), max_threshold
# Build the optimization model
solver = pywraplp.Solver.CreateSolver('CBC')
if not solver:
# Fallback if solver not available - keep original lineup
return row.copy(), max_threshold
# Create decision variables: x[player_idx, col_idx] = 1 if player is assigned to column
player_list = available_players.to_dict('records')
num_players = len(player_list)
num_open_cols = len(open_columns)
# x[i][j] = 1 if player i is assigned to open column j
x = {}
for i in range(num_players):
for j in range(num_open_cols):
x[i, j] = solver.BoolVar(f'x_{i}_{j}')
# Constraint 1: Each open column gets exactly one player
for j in range(num_open_cols):
solver.Add(sum(x[i, j] for i in range(num_players)) == 1)
# Constraint 2: Each player can only be used AT MOST once across all open columns
for i in range(num_players):
solver.Add(sum(x[i, j] for j in range(num_open_cols)) <= 1)
# Constraint 3 & Opponent Exclusion: Handle locked players and opponents
opp_map = map_dict.get('opp_map', {})
if opp_map:
# When opp_map exists, use opponent-based constraints
# Build a mapping of locked opponents that cannot be selected
locked_opponents = set()
for locked_player in locked_player_names:
opponent = opp_map.get(locked_player)
if opponent:
locked_opponents.add(opponent)
# Prevent locked opponents from being selected
for i, player in enumerate(player_list):
player_name = player['player_names']
if player_name in locked_opponents:
for j in range(num_open_cols):
solver.Add(x[i, j] == 0)
# Prevent opponents from being selected together in open positions
player_name_to_idx = {player['player_names']: i for i, player in enumerate(player_list)}
for i, player in enumerate(player_list):
player_name = player['player_names']
opponent_name = opp_map.get(player_name)
if opponent_name and opponent_name in player_name_to_idx:
opponent_idx = player_name_to_idx[opponent_name]
# If player i is selected, opponent cannot be selected
solver.Add(
sum(x[i, j] for j in range(num_open_cols)) +
sum(x[opponent_idx, j] for j in range(num_open_cols)) <= 1
)
else:
# When opp_map doesn't exist, use standard locked player constraint
# Constraint 3: Players already LOCKED in the row cannot be selected again
for i, player in enumerate(player_list):
player_name = player['player_names']
if player_name in locked_player_names:
for j in range(num_open_cols):
solver.Add(x[i, j] == 0)
# Constraint 4: Position eligibility
for i, player in enumerate(player_list):
player_positions = player['position'].split('/')
for j, col in enumerate(open_columns):
if type_var == 'Classic':
if not check_position_eligibility(sport_var, col, player_positions):
solver.Add(x[i, j] == 0)
else:
# For Showdown, CPT and FLEX can take any player
pass
# Constraint 5: Total salary of selected players <= remaining_salary
salary_constraint = []
for i, player in enumerate(player_list):
for j, col in enumerate(open_columns):
effective_salary = get_effective_salary(player['player_names'], col, map_dict, type_var)
salary_constraint.append(x[i, j] * effective_salary)
solver.Add(sum(salary_constraint) <= remaining_salary)
# Constraint 6: Total objective value <= max_objective_value
objective_terms = []
for i, player in enumerate(player_list):
metric_value = player.get(optimize_by, player.get('median', 0))
for j in range(num_open_cols):
objective_terms.append(x[i, j] * metric_value)
solver.Add(sum(objective_terms) <= remaining_objective_budget)
# Objective: Maximize the sum of the optimization metric
solver.Maximize(sum(objective_terms))
# Solve
status = solver.Solve()
achieved_objective = locked_objective_value # Start with locked contribution
if status == pywraplp.Solver.OPTIMAL or status == pywraplp.Solver.FEASIBLE:
# Extract solution: only open_columns are filled; locked columns stay fixed
for j, col in enumerate(open_columns):
for i, player in enumerate(player_list):
if x[i, j].solution_value() > 0.5:
optimized_row[col] = player['player_names']
achieved_objective += player.get(optimize_by, player.get('median', 0))
break
# Enforce locked players stay in their starting column (no move to F/UTIL/etc.)
for col, player_name in locked_players.items():
optimized_row[col] = player_name
return optimized_row
def optimize_lineups(
working_frame: pd.DataFrame,
projections_df: pd.DataFrame,
player_columns: list,
map_dict: dict,
lock_teams: list,
optimize_if_player_exists: list,
skip_if_player_exists: list,
type_var: str,
sport_var: str,
salary_max: int,
optimize_by: str,
) -> pd.DataFrame:
lock_teams = lock_teams or []
optimized_frame = working_frame.copy()
player_pool = projections_df
metric_col = _metric_column(optimize_by)
original_frame = working_frame.copy()
# Which rows to run optimization on (default: all)
if optimize_if_player_exists or skip_if_player_exists:
def row_has_any(row, players):
return any(p in list(row) for p in players)
optimize_mask = (
working_frame.apply(lambda row: row_has_any(row, optimize_if_player_exists), axis=1)
if optimize_if_player_exists
else pd.Series(True, index=working_frame.index)
)
skip_mask = (
~working_frame.apply(lambda row: row_has_any(row, skip_if_player_exists), axis=1)
if skip_if_player_exists
else pd.Series(True, index=working_frame.index)
)
to_optimize_mask = optimize_mask & skip_mask
else:
to_optimize_mask = pd.Series(True, index=working_frame.index)
subset_frame = working_frame[to_optimize_mask]
original_subset_frame = original_frame[to_optimize_mask]
original_objectives = [
calculate_lineup_objective(original_subset_frame.iloc[i], player_columns, player_pool, metric_col)
for i in range(len(original_subset_frame))
]
# First pass: optimize every row with no cap; collect achieved objectives
objectives = []
for i in range(len(subset_frame)):
opt_row, achieved = optimize_single_lineup(
row=subset_frame.iloc[i],
player_columns=player_columns,
player_pool=player_pool,
map_dict=map_dict,
lock_teams=lock_teams,
type_var=type_var,
sport_var=sport_var,
salary_max=salary_max,
optimize_by=optimize_by,
)
subset_frame.iloc[i] = opt_row
objectives.append(achieved)
# Sort so row 0 is best (by objective); keep index so we can restore original order later
order = sorted(range(len(subset_frame)), key=lambda i: objectives[i], reverse=True)
subset_frame = subset_frame.iloc[order] # index travels with rows (no reset_index)
objectives = [objectives[i] for i in order]
# Second pass: greedy one-swap for any row whose lineup we've already used (duplicate)
used_lineup_keys = {_lineup_key(subset_frame.iloc[0], player_columns)}
for i in range(1, len(subset_frame)):
prev_row = subset_frame.iloc[i - 1]
curr_key = _lineup_key(subset_frame.iloc[i], player_columns)
if curr_key in used_lineup_keys:
new_row, new_obj = _one_swap_below(
subset_frame.iloc[i],
prev_row,
player_columns,
player_pool,
map_dict,
lock_teams,
type_var,
sport_var,
salary_max,
optimize_by,
used_lineup_keys,
)
subset_frame.iloc[i] = new_row
objectives[i] = new_obj
used_lineup_keys.add(_lineup_key(new_row, player_columns))
else:
used_lineup_keys.add(curr_key)
# Revert any row that is worse than the original lineup that ended up in this slot
for i in range(len(subset_frame)):
orig_obj = original_objectives[order[i]]
if objectives[i] < orig_obj:
subset_frame.iloc[i] = original_subset_frame.iloc[order[i]]
# Restore original row order (by index) so export/display matches pre-optimization sort
subset_frame = subset_frame.sort_index()
optimized_frame.loc[to_optimize_mask] = subset_frame
return optimized_frame