Spaces:
Build error
Build error
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from io import StringIO | |
| from typing import Any | |
| import backtrader as bt | |
| import pandas as pd | |
| import yfinance as yf | |
| from backtesting import Backtest, Strategy | |
| class BacktestResult: | |
| engine: str | |
| metrics: dict[str, Any] | |
| equity_curve: pd.DataFrame | |
| trades: pd.DataFrame | |
| input_rows: int | |
| def _normalize_ohlcv_columns(df: pd.DataFrame) -> pd.DataFrame: | |
| if isinstance(df.columns, pd.MultiIndex): | |
| df.columns = df.columns.get_level_values(0) | |
| rename_map = {} | |
| for col in df.columns: | |
| lower = str(col).lower() | |
| if lower == "open": | |
| rename_map[col] = "Open" | |
| elif lower == "high": | |
| rename_map[col] = "High" | |
| elif lower == "low": | |
| rename_map[col] = "Low" | |
| elif lower == "close": | |
| rename_map[col] = "Close" | |
| elif lower == "volume": | |
| rename_map[col] = "Volume" | |
| normalized = df.rename(columns=rename_map).copy() | |
| for required in ["Open", "High", "Low", "Close"]: | |
| if required not in normalized.columns: | |
| raise ValueError(f"Missing required OHLC column: {required}") | |
| if "Volume" not in normalized.columns: | |
| normalized["Volume"] = 0 | |
| if not isinstance(normalized.index, pd.DatetimeIndex): | |
| normalized.index = pd.to_datetime(normalized.index, utc=True, errors="coerce") | |
| if normalized.index.tz is not None: | |
| normalized.index = normalized.index.tz_convert("UTC").tz_localize(None) | |
| normalized = normalized.dropna(subset=["Open", "High", "Low", "Close"]) | |
| normalized = normalized.sort_index() | |
| return normalized | |
| def load_price_data_from_yfinance( | |
| symbol: str, | |
| start: str, | |
| end: str, | |
| interval: str = "1d", | |
| ) -> pd.DataFrame: | |
| df = yf.download(symbol, start=start, end=end, interval=interval, auto_adjust=False) | |
| if df is None or df.empty: | |
| raise ValueError(f"No market data returned for symbol: {symbol}") | |
| return _normalize_ohlcv_columns(df) | |
| def load_price_data_from_csv_text(csv_text: str) -> pd.DataFrame: | |
| df = pd.read_csv(StringIO(csv_text)) | |
| lowered = {str(c).lower(): c for c in df.columns} | |
| if "date" in lowered: | |
| date_col = lowered["date"] | |
| df[date_col] = pd.to_datetime(df[date_col], utc=True, errors="coerce") | |
| df = df.set_index(date_col) | |
| elif "datetime" in lowered: | |
| dt_col = lowered["datetime"] | |
| df[dt_col] = pd.to_datetime(df[dt_col], utc=True, errors="coerce") | |
| df = df.set_index(dt_col) | |
| return _normalize_ohlcv_columns(df) | |
| class SmaCrossBacktestingPy(Strategy): | |
| fast_period = 10 | |
| slow_period = 30 | |
| def init(self) -> None: | |
| close = pd.Series(self.data.Close) | |
| self.fast = self.I(lambda x: pd.Series(x).rolling(self.fast_period).mean(), close) | |
| self.slow = self.I(lambda x: pd.Series(x).rolling(self.slow_period).mean(), close) | |
| def next(self) -> None: | |
| if self.fast[-1] > self.slow[-1] and not self.position: | |
| self.buy() | |
| elif self.fast[-1] < self.slow[-1] and self.position: | |
| self.position.close() | |
| class SmaCrossBacktrader(bt.Strategy): | |
| params = (("fast_period", 10), ("slow_period", 30)) | |
| def __init__(self) -> None: | |
| self.fast = bt.indicators.SimpleMovingAverage( | |
| self.data.close, period=self.params.fast_period | |
| ) | |
| self.slow = bt.indicators.SimpleMovingAverage( | |
| self.data.close, period=self.params.slow_period | |
| ) | |
| self.crossover = bt.indicators.CrossOver(self.fast, self.slow) | |
| self.equity_points: list[tuple[pd.Timestamp, float]] = [] | |
| def next(self) -> None: | |
| if self.crossover > 0 and not self.position: | |
| self.buy() | |
| elif self.crossover < 0 and self.position: | |
| self.close() | |
| dt = self.data.datetime.datetime(0) | |
| self.equity_points.append((pd.Timestamp(dt, tz="UTC"), self.broker.getvalue())) | |
| def run_backtesting_py( | |
| data: pd.DataFrame, | |
| fast_period: int, | |
| slow_period: int, | |
| initial_cash: float, | |
| commission: float, | |
| ) -> BacktestResult: | |
| strategy_cls = type( | |
| "ConfiguredSmaCrossBacktestingPy", | |
| (SmaCrossBacktestingPy,), | |
| {"fast_period": fast_period, "slow_period": slow_period}, | |
| ) | |
| bt_obj = Backtest(data, strategy_cls, cash=initial_cash, commission=commission) | |
| stats = bt_obj.run() | |
| equity_curve = stats.get("_equity_curve", pd.DataFrame()) | |
| trades = stats.get("_trades", pd.DataFrame()) | |
| metrics = { | |
| "Return [%]": float(stats.get("Return [%]", 0.0)), | |
| "Buy & Hold Return [%]": float(stats.get("Buy & Hold Return [%]", 0.0)), | |
| "Sharpe Ratio": float(stats.get("Sharpe Ratio", 0.0) or 0.0), | |
| "Max Drawdown [%]": float(stats.get("Max. Drawdown [%]", 0.0)), | |
| "# Trades": int(stats.get("# Trades", 0)), | |
| "Win Rate [%]": float(stats.get("Win Rate [%]", 0.0)), | |
| } | |
| return BacktestResult( | |
| engine="backtesting.py", | |
| metrics=metrics, | |
| equity_curve=equity_curve, | |
| trades=trades, | |
| input_rows=len(data), | |
| ) | |
| def run_backtrader( | |
| data: pd.DataFrame, | |
| fast_period: int, | |
| slow_period: int, | |
| initial_cash: float, | |
| commission: float, | |
| ) -> BacktestResult: | |
| cerebro = bt.Cerebro() | |
| configured = type( | |
| "ConfiguredSmaCrossBacktrader", | |
| (SmaCrossBacktrader,), | |
| {"params": (("fast_period", fast_period), ("slow_period", slow_period))}, | |
| ) | |
| cerebro.addstrategy(configured) | |
| feed = bt.feeds.PandasData(dataname=data) | |
| cerebro.adddata(feed) | |
| cerebro.broker.setcash(initial_cash) | |
| cerebro.broker.setcommission(commission=commission) | |
| cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown") | |
| cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name="sharpe", timeframe=bt.TimeFrame.Days) | |
| cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trades") | |
| starting_value = cerebro.broker.getvalue() | |
| run_result = cerebro.run() | |
| ending_value = cerebro.broker.getvalue() | |
| strategy = run_result[0] | |
| dd = strategy.analyzers.drawdown.get_analysis() | |
| sharpe = strategy.analyzers.sharpe.get_analysis() | |
| ta = strategy.analyzers.trades.get_analysis() | |
| total_closed = int(getattr(ta.total, "closed", 0) if hasattr(ta, "total") else 0) | |
| won_total = int(getattr(ta.won, "total", 0) if hasattr(ta, "won") else 0) | |
| win_rate = (won_total / total_closed * 100) if total_closed else 0.0 | |
| ret_pct = ((ending_value - starting_value) / starting_value * 100) if starting_value else 0.0 | |
| equity_curve = pd.DataFrame(strategy.equity_points, columns=["Time", "Equity"]) | |
| if not equity_curve.empty: | |
| equity_curve = equity_curve.set_index("Time") | |
| metrics = { | |
| "Return [%]": float(ret_pct), | |
| "Sharpe Ratio": float(sharpe.get("sharperatio", 0.0) or 0.0), | |
| "Max Drawdown [%]": float(getattr(dd.max, "drawdown", 0.0) if hasattr(dd, "max") else 0.0), | |
| "# Trades": total_closed, | |
| "Win Rate [%]": float(win_rate), | |
| "Final Equity": float(ending_value), | |
| } | |
| return BacktestResult( | |
| engine="backtrader", | |
| metrics=metrics, | |
| equity_curve=equity_curve, | |
| trades=pd.DataFrame(), | |
| input_rows=len(data), | |
| ) | |
| def run_backtest( | |
| engine: str, | |
| data: pd.DataFrame, | |
| fast_period: int, | |
| slow_period: int, | |
| initial_cash: float, | |
| commission: float, | |
| ) -> BacktestResult: | |
| if fast_period >= slow_period: | |
| raise ValueError("fast_period must be smaller than slow_period.") | |
| if engine == "backtesting.py": | |
| return run_backtesting_py(data, fast_period, slow_period, initial_cash, commission) | |
| if engine == "backtrader": | |
| return run_backtrader(data, fast_period, slow_period, initial_cash, commission) | |
| raise ValueError(f"Unsupported engine: {engine}") | |