from __future__ import annotations from dataclasses import dataclass from io import StringIO from typing import Any import backtrader as bt import pandas as pd import yfinance as yf from backtesting import Backtest, Strategy @dataclass class BacktestResult: engine: str metrics: dict[str, Any] equity_curve: pd.DataFrame trades: pd.DataFrame input_rows: int def _normalize_ohlcv_columns(df: pd.DataFrame) -> pd.DataFrame: if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) rename_map = {} for col in df.columns: lower = str(col).lower() if lower == "open": rename_map[col] = "Open" elif lower == "high": rename_map[col] = "High" elif lower == "low": rename_map[col] = "Low" elif lower == "close": rename_map[col] = "Close" elif lower == "volume": rename_map[col] = "Volume" normalized = df.rename(columns=rename_map).copy() for required in ["Open", "High", "Low", "Close"]: if required not in normalized.columns: raise ValueError(f"Missing required OHLC column: {required}") if "Volume" not in normalized.columns: normalized["Volume"] = 0 if not isinstance(normalized.index, pd.DatetimeIndex): normalized.index = pd.to_datetime(normalized.index, utc=True, errors="coerce") if normalized.index.tz is not None: normalized.index = normalized.index.tz_convert("UTC").tz_localize(None) normalized = normalized.dropna(subset=["Open", "High", "Low", "Close"]) normalized = normalized.sort_index() return normalized def load_price_data_from_yfinance( symbol: str, start: str, end: str, interval: str = "1d", ) -> pd.DataFrame: df = yf.download(symbol, start=start, end=end, interval=interval, auto_adjust=False) if df is None or df.empty: raise ValueError(f"No market data returned for symbol: {symbol}") return _normalize_ohlcv_columns(df) def load_price_data_from_csv_text(csv_text: str) -> pd.DataFrame: df = pd.read_csv(StringIO(csv_text)) lowered = {str(c).lower(): c for c in df.columns} if "date" in lowered: date_col = lowered["date"] df[date_col] = pd.to_datetime(df[date_col], utc=True, errors="coerce") df = df.set_index(date_col) elif "datetime" in lowered: dt_col = lowered["datetime"] df[dt_col] = pd.to_datetime(df[dt_col], utc=True, errors="coerce") df = df.set_index(dt_col) return _normalize_ohlcv_columns(df) class SmaCrossBacktestingPy(Strategy): fast_period = 10 slow_period = 30 def init(self) -> None: close = pd.Series(self.data.Close) self.fast = self.I(lambda x: pd.Series(x).rolling(self.fast_period).mean(), close) self.slow = self.I(lambda x: pd.Series(x).rolling(self.slow_period).mean(), close) def next(self) -> None: if self.fast[-1] > self.slow[-1] and not self.position: self.buy() elif self.fast[-1] < self.slow[-1] and self.position: self.position.close() class SmaCrossBacktrader(bt.Strategy): params = (("fast_period", 10), ("slow_period", 30)) def __init__(self) -> None: self.fast = bt.indicators.SimpleMovingAverage( self.data.close, period=self.params.fast_period ) self.slow = bt.indicators.SimpleMovingAverage( self.data.close, period=self.params.slow_period ) self.crossover = bt.indicators.CrossOver(self.fast, self.slow) self.equity_points: list[tuple[pd.Timestamp, float]] = [] def next(self) -> None: if self.crossover > 0 and not self.position: self.buy() elif self.crossover < 0 and self.position: self.close() dt = self.data.datetime.datetime(0) self.equity_points.append((pd.Timestamp(dt, tz="UTC"), self.broker.getvalue())) def run_backtesting_py( data: pd.DataFrame, fast_period: int, slow_period: int, initial_cash: float, commission: float, ) -> BacktestResult: strategy_cls = type( "ConfiguredSmaCrossBacktestingPy", (SmaCrossBacktestingPy,), {"fast_period": fast_period, "slow_period": slow_period}, ) bt_obj = Backtest(data, strategy_cls, cash=initial_cash, commission=commission) stats = bt_obj.run() equity_curve = stats.get("_equity_curve", pd.DataFrame()) trades = stats.get("_trades", pd.DataFrame()) metrics = { "Return [%]": float(stats.get("Return [%]", 0.0)), "Buy & Hold Return [%]": float(stats.get("Buy & Hold Return [%]", 0.0)), "Sharpe Ratio": float(stats.get("Sharpe Ratio", 0.0) or 0.0), "Max Drawdown [%]": float(stats.get("Max. Drawdown [%]", 0.0)), "# Trades": int(stats.get("# Trades", 0)), "Win Rate [%]": float(stats.get("Win Rate [%]", 0.0)), } return BacktestResult( engine="backtesting.py", metrics=metrics, equity_curve=equity_curve, trades=trades, input_rows=len(data), ) def run_backtrader( data: pd.DataFrame, fast_period: int, slow_period: int, initial_cash: float, commission: float, ) -> BacktestResult: cerebro = bt.Cerebro() configured = type( "ConfiguredSmaCrossBacktrader", (SmaCrossBacktrader,), {"params": (("fast_period", fast_period), ("slow_period", slow_period))}, ) cerebro.addstrategy(configured) feed = bt.feeds.PandasData(dataname=data) cerebro.adddata(feed) cerebro.broker.setcash(initial_cash) cerebro.broker.setcommission(commission=commission) cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown") cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name="sharpe", timeframe=bt.TimeFrame.Days) cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trades") starting_value = cerebro.broker.getvalue() run_result = cerebro.run() ending_value = cerebro.broker.getvalue() strategy = run_result[0] dd = strategy.analyzers.drawdown.get_analysis() sharpe = strategy.analyzers.sharpe.get_analysis() ta = strategy.analyzers.trades.get_analysis() total_closed = int(getattr(ta.total, "closed", 0) if hasattr(ta, "total") else 0) won_total = int(getattr(ta.won, "total", 0) if hasattr(ta, "won") else 0) win_rate = (won_total / total_closed * 100) if total_closed else 0.0 ret_pct = ((ending_value - starting_value) / starting_value * 100) if starting_value else 0.0 equity_curve = pd.DataFrame(strategy.equity_points, columns=["Time", "Equity"]) if not equity_curve.empty: equity_curve = equity_curve.set_index("Time") metrics = { "Return [%]": float(ret_pct), "Sharpe Ratio": float(sharpe.get("sharperatio", 0.0) or 0.0), "Max Drawdown [%]": float(getattr(dd.max, "drawdown", 0.0) if hasattr(dd, "max") else 0.0), "# Trades": total_closed, "Win Rate [%]": float(win_rate), "Final Equity": float(ending_value), } return BacktestResult( engine="backtrader", metrics=metrics, equity_curve=equity_curve, trades=pd.DataFrame(), input_rows=len(data), ) def run_backtest( engine: str, data: pd.DataFrame, fast_period: int, slow_period: int, initial_cash: float, commission: float, ) -> BacktestResult: if fast_period >= slow_period: raise ValueError("fast_period must be smaller than slow_period.") if engine == "backtesting.py": return run_backtesting_py(data, fast_period, slow_period, initial_cash, commission) if engine == "backtrader": return run_backtrader(data, fast_period, slow_period, initial_cash, commission) raise ValueError(f"Unsupported engine: {engine}")