AGI_Assistant / tools /backtesting_runner.py
Dmitry Beresnev
add backtesting
ad9d2ff
from __future__ import annotations
from dataclasses import dataclass
from io import StringIO
from typing import Any
import backtrader as bt
import pandas as pd
import yfinance as yf
from backtesting import Backtest, Strategy
@dataclass
class BacktestResult:
engine: str
metrics: dict[str, Any]
equity_curve: pd.DataFrame
trades: pd.DataFrame
input_rows: int
def _normalize_ohlcv_columns(df: pd.DataFrame) -> pd.DataFrame:
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
rename_map = {}
for col in df.columns:
lower = str(col).lower()
if lower == "open":
rename_map[col] = "Open"
elif lower == "high":
rename_map[col] = "High"
elif lower == "low":
rename_map[col] = "Low"
elif lower == "close":
rename_map[col] = "Close"
elif lower == "volume":
rename_map[col] = "Volume"
normalized = df.rename(columns=rename_map).copy()
for required in ["Open", "High", "Low", "Close"]:
if required not in normalized.columns:
raise ValueError(f"Missing required OHLC column: {required}")
if "Volume" not in normalized.columns:
normalized["Volume"] = 0
if not isinstance(normalized.index, pd.DatetimeIndex):
normalized.index = pd.to_datetime(normalized.index, utc=True, errors="coerce")
if normalized.index.tz is not None:
normalized.index = normalized.index.tz_convert("UTC").tz_localize(None)
normalized = normalized.dropna(subset=["Open", "High", "Low", "Close"])
normalized = normalized.sort_index()
return normalized
def load_price_data_from_yfinance(
symbol: str,
start: str,
end: str,
interval: str = "1d",
) -> pd.DataFrame:
df = yf.download(symbol, start=start, end=end, interval=interval, auto_adjust=False)
if df is None or df.empty:
raise ValueError(f"No market data returned for symbol: {symbol}")
return _normalize_ohlcv_columns(df)
def load_price_data_from_csv_text(csv_text: str) -> pd.DataFrame:
df = pd.read_csv(StringIO(csv_text))
lowered = {str(c).lower(): c for c in df.columns}
if "date" in lowered:
date_col = lowered["date"]
df[date_col] = pd.to_datetime(df[date_col], utc=True, errors="coerce")
df = df.set_index(date_col)
elif "datetime" in lowered:
dt_col = lowered["datetime"]
df[dt_col] = pd.to_datetime(df[dt_col], utc=True, errors="coerce")
df = df.set_index(dt_col)
return _normalize_ohlcv_columns(df)
class SmaCrossBacktestingPy(Strategy):
fast_period = 10
slow_period = 30
def init(self) -> None:
close = pd.Series(self.data.Close)
self.fast = self.I(lambda x: pd.Series(x).rolling(self.fast_period).mean(), close)
self.slow = self.I(lambda x: pd.Series(x).rolling(self.slow_period).mean(), close)
def next(self) -> None:
if self.fast[-1] > self.slow[-1] and not self.position:
self.buy()
elif self.fast[-1] < self.slow[-1] and self.position:
self.position.close()
class SmaCrossBacktrader(bt.Strategy):
params = (("fast_period", 10), ("slow_period", 30))
def __init__(self) -> None:
self.fast = bt.indicators.SimpleMovingAverage(
self.data.close, period=self.params.fast_period
)
self.slow = bt.indicators.SimpleMovingAverage(
self.data.close, period=self.params.slow_period
)
self.crossover = bt.indicators.CrossOver(self.fast, self.slow)
self.equity_points: list[tuple[pd.Timestamp, float]] = []
def next(self) -> None:
if self.crossover > 0 and not self.position:
self.buy()
elif self.crossover < 0 and self.position:
self.close()
dt = self.data.datetime.datetime(0)
self.equity_points.append((pd.Timestamp(dt, tz="UTC"), self.broker.getvalue()))
def run_backtesting_py(
data: pd.DataFrame,
fast_period: int,
slow_period: int,
initial_cash: float,
commission: float,
) -> BacktestResult:
strategy_cls = type(
"ConfiguredSmaCrossBacktestingPy",
(SmaCrossBacktestingPy,),
{"fast_period": fast_period, "slow_period": slow_period},
)
bt_obj = Backtest(data, strategy_cls, cash=initial_cash, commission=commission)
stats = bt_obj.run()
equity_curve = stats.get("_equity_curve", pd.DataFrame())
trades = stats.get("_trades", pd.DataFrame())
metrics = {
"Return [%]": float(stats.get("Return [%]", 0.0)),
"Buy & Hold Return [%]": float(stats.get("Buy & Hold Return [%]", 0.0)),
"Sharpe Ratio": float(stats.get("Sharpe Ratio", 0.0) or 0.0),
"Max Drawdown [%]": float(stats.get("Max. Drawdown [%]", 0.0)),
"# Trades": int(stats.get("# Trades", 0)),
"Win Rate [%]": float(stats.get("Win Rate [%]", 0.0)),
}
return BacktestResult(
engine="backtesting.py",
metrics=metrics,
equity_curve=equity_curve,
trades=trades,
input_rows=len(data),
)
def run_backtrader(
data: pd.DataFrame,
fast_period: int,
slow_period: int,
initial_cash: float,
commission: float,
) -> BacktestResult:
cerebro = bt.Cerebro()
configured = type(
"ConfiguredSmaCrossBacktrader",
(SmaCrossBacktrader,),
{"params": (("fast_period", fast_period), ("slow_period", slow_period))},
)
cerebro.addstrategy(configured)
feed = bt.feeds.PandasData(dataname=data)
cerebro.adddata(feed)
cerebro.broker.setcash(initial_cash)
cerebro.broker.setcommission(commission=commission)
cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown")
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name="sharpe", timeframe=bt.TimeFrame.Days)
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trades")
starting_value = cerebro.broker.getvalue()
run_result = cerebro.run()
ending_value = cerebro.broker.getvalue()
strategy = run_result[0]
dd = strategy.analyzers.drawdown.get_analysis()
sharpe = strategy.analyzers.sharpe.get_analysis()
ta = strategy.analyzers.trades.get_analysis()
total_closed = int(getattr(ta.total, "closed", 0) if hasattr(ta, "total") else 0)
won_total = int(getattr(ta.won, "total", 0) if hasattr(ta, "won") else 0)
win_rate = (won_total / total_closed * 100) if total_closed else 0.0
ret_pct = ((ending_value - starting_value) / starting_value * 100) if starting_value else 0.0
equity_curve = pd.DataFrame(strategy.equity_points, columns=["Time", "Equity"])
if not equity_curve.empty:
equity_curve = equity_curve.set_index("Time")
metrics = {
"Return [%]": float(ret_pct),
"Sharpe Ratio": float(sharpe.get("sharperatio", 0.0) or 0.0),
"Max Drawdown [%]": float(getattr(dd.max, "drawdown", 0.0) if hasattr(dd, "max") else 0.0),
"# Trades": total_closed,
"Win Rate [%]": float(win_rate),
"Final Equity": float(ending_value),
}
return BacktestResult(
engine="backtrader",
metrics=metrics,
equity_curve=equity_curve,
trades=pd.DataFrame(),
input_rows=len(data),
)
def run_backtest(
engine: str,
data: pd.DataFrame,
fast_period: int,
slow_period: int,
initial_cash: float,
commission: float,
) -> BacktestResult:
if fast_period >= slow_period:
raise ValueError("fast_period must be smaller than slow_period.")
if engine == "backtesting.py":
return run_backtesting_py(data, fast_period, slow_period, initial_cash, commission)
if engine == "backtrader":
return run_backtrader(data, fast_period, slow_period, initial_cash, commission)
raise ValueError(f"Unsupported engine: {engine}")