luohoa97's picture
Deploy BitNet-Transformer Trainer
d5b7ee9 verified
"""Backtesting framework — simulates trades using historical OHLCV + sentiment."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any
import numpy as np
import pandas as pd
from trading_cli.sentiment.aggregator import aggregate_scores_weighted
from trading_cli.sentiment.news_classifier import classify_headlines, EventType
from trading_cli.strategy.signals import generate_signal, technical_score
from trading_cli.strategy.risk import calculate_position_size, check_stop_loss, check_max_drawdown
logger = logging.getLogger(__name__)
@dataclass
class BacktestTrade:
timestamp: str
symbol: str
action: str # BUY or SELL
price: float
qty: int
reason: str
pnl: float = 0.0
@dataclass
class BacktestResult:
symbol: str
start_date: str
end_date: str
initial_capital: float
final_equity: float
total_return_pct: float
max_drawdown_pct: float
sharpe_ratio: float
win_rate: float
total_trades: int
winning_trades: int
losing_trades: int
trades: list[BacktestTrade] = field(default_factory=list)
equity_curve: list[float] = field(default_factory=list)
def summary_dict(self) -> dict:
return {
"symbol": self.symbol,
"period": f"{self.start_date} to {self.end_date}",
"initial_capital": f"${self.initial_capital:,.2f}",
"final_equity": f"${self.final_equity:,.2f}",
"total_return": f"{self.total_return_pct:+.2f}%",
"max_drawdown": f"{self.max_drawdown_pct:.2f}%",
"sharpe_ratio": f"{self.sharpe_ratio:.2f}",
"win_rate": f"{self.win_rate:.1f}%",
"total_trades": self.total_trades,
"winning_trades": self.winning_trades,
"losing_trades": self.losing_trades,
}
class BacktestEngine:
"""Runs historical simulation using the same signal pipeline as live trading."""
def __init__(
self,
config: dict,
finbert=None,
news_fetcher=None,
use_sentiment: bool = True,
strategy=None,
progress_callback=None,
debug: bool = False,
):
"""
Args:
config: Trading configuration dict.
finbert: FinBERTAnalyzer instance (or None to skip sentiment).
news_fetcher: Callable(symbol, days_ago) -> list[tuple[str, float]]
Returns list of (headline, unix_timestamp) tuples.
use_sentiment: If False, skip all sentiment scoring regardless of
whether finbert/news_fetcher are provided.
strategy: StrategyAdapter instance. If None, falls back to legacy
hardcoded technical + sentiment pipeline.
progress_callback: Optional callable(str) to report progress.
debug: If True, log every bar's signal details at INFO level.
"""
self.config = config
self.finbert = finbert
self.news_fetcher = news_fetcher
self.use_sentiment = use_sentiment
self.strategy = strategy
self.progress_callback = progress_callback
self.debug = debug
# Force INFO level on this logger when debug is enabled
if debug:
logger.setLevel(logging.INFO)
def run(
self,
symbol: str,
ohlcv: pd.DataFrame,
start_date: str | None = None,
end_date: str | None = None,
initial_capital: float = 100_000.0,
) -> BacktestResult:
"""
Run backtest on historical OHLCV data.
Simulates daily signal generation and order execution at next day's open.
"""
df = ohlcv.copy()
# Handle both column-based and index-based dates
if "Date" in df.columns or "date" in df.columns:
date_col = "Date" if "Date" in df.columns else "date"
df[date_col] = pd.to_datetime(df[date_col])
df = df.set_index(date_col)
# Handle timezone mismatch for date range filtering
# Alpaca data is UTC-aware, while start_date/end_date from UI are naive
if start_date:
sd = pd.Timestamp(start_date)
if df.index.tz is not None:
sd = sd.tz_localize(df.index.tz)
df = df[df.index >= sd]
if end_date:
ed = pd.Timestamp(end_date)
if df.index.tz is not None:
ed = ed.tz_localize(df.index.tz)
df = df[df.index <= ed]
# Reset index to get date back as a column for downstream code
# Ensure we name the date column 'date' regardless of the index name
df = df.reset_index()
# If the index had a name (e.g. 'timestamp'), it will be the first column
# Otherwise it's named 'index'
if "index" in df.columns:
df = df.rename(columns={"index": "date"})
elif df.columns[0] != "date":
df = df.rename(columns={df.columns[0]: "date"})
# Normalize column names to lowercase for consistent access
# yfinance can return MultiIndex columns (tuples), so flatten them first
if isinstance(df.columns, pd.MultiIndex):
df.columns = [c[0] for c in df.columns]
df.columns = [c.lower() for c in df.columns]
if "adj close" in df.columns:
df = df.rename(columns={"adj close": "adj_close"})
logger.info("Backtest %s: %d bars, columns: %s", symbol, len(df), list(df.columns))
if len(df) < 60:
logger.warning("Backtest %s: not enough data (%d bars, need 60+)", symbol, len(df))
date_col = "date" if "date" in df.columns else None
start_str = str(df.iloc[0][date_col])[:10] if date_col and len(df) > 0 else "N/A"
end_str = str(df.iloc[-1][date_col])[:10] if date_col and len(df) > 0 else "N/A"
return BacktestResult(
symbol=symbol,
start_date=start_str,
end_date=end_str,
initial_capital=initial_capital,
final_equity=initial_capital,
total_return_pct=0.0,
max_drawdown_pct=0.0,
sharpe_ratio=0.0,
win_rate=0.0,
total_trades=0,
winning_trades=0,
losing_trades=0,
)
cash = initial_capital
position_qty = 0
position_avg_price = 0.0
equity_curve = [initial_capital]
trades: list[BacktestTrade] = []
equity_values = [initial_capital]
# Normalize column names to lowercase for consistent access
# yfinance can return MultiIndex columns (tuples), so flatten them first
if isinstance(df.columns, pd.MultiIndex):
df.columns = [c[0] for c in df.columns]
df.columns = [c.lower() for c in df.columns]
if "adj close" in df.columns:
df = df.rename(columns={"adj close": "adj_close"})
logger.info("Backtest %s: %d bars, columns: %s", symbol, len(df), list(df.columns))
if len(df) < 60:
logger.warning("Backtest %s: not enough data (%d bars, need 60+)", symbol, len(df))
# Config params
buy_threshold = self.config.get("signal_buy_threshold", 0.5)
sell_threshold = self.config.get("signal_sell_threshold", -0.3)
sma_short = self.config.get("sma_short", 20)
sma_long = self.config.get("sma_long", 50)
rsi_period = self.config.get("rsi_period", 14)
bb_window = self.config.get("bb_window", 20)
bb_std = self.config.get("bb_std", 2.0)
ema_fast = self.config.get("ema_fast", 12)
ema_slow = self.config.get("ema_slow", 26)
vol_window = self.config.get("volume_window", 20)
tech_weight = self.config.get("tech_weight", 0.6)
sent_weight = self.config.get("sent_weight", 0.4)
risk_pct = self.config.get("risk_pct", 0.02)
max_dd = self.config.get("max_drawdown", 0.15)
stop_loss_pct = self.config.get("stop_loss_pct", 0.05)
tech_weights = {
"sma": self.config.get("weight_sma", 0.25),
"rsi": self.config.get("weight_rsi", 0.25),
"bb": self.config.get("weight_bb", 0.20),
"ema": self.config.get("weight_ema", 0.15),
"volume": self.config.get("weight_volume", 0.15),
}
# ── Pre-fetch and cache all sentiment scores ──────────────────────
lookback = max(sma_long, ema_slow, bb_window, vol_window) + 30
logger.info("Backtest %s: lookback=%d, total_bars=%d", symbol, lookback, len(df) - lookback)
sent_scores = {}
if self.use_sentiment and self.finbert and self.news_fetcher:
total_days = len(df) - lookback
try:
# Fetch all news once (batch)
if self.progress_callback:
self.progress_callback("Fetching historical news…")
all_news = self.news_fetcher(symbol, days_ago=len(df))
if all_news:
headlines = [item[0] for item in all_news]
timestamps = [item[1] for item in all_news]
classifications = classify_headlines(headlines)
# Analyze all headlines at once
if self.progress_callback:
self.progress_callback("Analyzing sentiment (batch)…")
results = self.finbert.analyze_batch(headlines)
# Single aggregated score for the whole period
cached_score = aggregate_scores_weighted(
results, classifications, timestamps=timestamps
)
# Apply same score to all bars (since we fetched once)
for i in range(lookback, len(df)):
sent_scores[i] = cached_score
except Exception as exc:
import logging
logging.getLogger(__name__).warning("Sentiment pre-fetch failed: %s", exc)
sent_scores = {}
# ── Walk forward through data ─────────────────────────────────────
total_bars = len(df) - lookback
if self.progress_callback:
self.progress_callback("Running simulation…")
for idx, i in enumerate(range(lookback, len(df))):
if self.progress_callback and idx % 20 == 0:
pct = int(idx / total_bars * 100) if total_bars else 0
self.progress_callback(f"Running simulation… {pct}%")
historical_ohlcv = df.iloc[:i]
current_bar = df.iloc[i]
current_price = float(current_bar["close"])
current_date = str(current_bar.get("date", ""))
# Use pre-cached sentiment score
sent_score = sent_scores.get(i, 0.0)
# Max drawdown check
if check_max_drawdown(equity_values, max_dd):
break # Stop backtest if drawdown exceeded
# Build mock position object for strategy adapter
class _MockPosition:
def __init__(self, symbol, qty, avg_price):
self.symbol = symbol
self.qty = qty
self.avg_entry_price = avg_price
backtest_positions = [_MockPosition(symbol, position_qty, position_avg_price)] if position_qty > 0 else []
# Generate signal — use strategy adapter if available, else legacy
if self.strategy is not None:
# Use strategy adapter
signal_result = self.strategy.generate_signal(
symbol=symbol,
ohlcv=historical_ohlcv,
sentiment_score=sent_score,
positions=backtest_positions,
config=self.config,
)
action = signal_result.action
score = signal_result.score
reason = signal_result.reason
buy_threshold = self.config.get("signal_buy_threshold", 0.5)
sell_threshold = self.config.get("signal_sell_threshold", -0.3)
if self.debug:
logger.info(
"Bar %d | %s | price=%.2f | score=%.3f | action=%s | reason=%s",
idx, current_date, current_price, score, action, reason,
)
else:
# Legacy hardcoded technical + sentiment
tech = technical_score(
historical_ohlcv, sma_short, sma_long, rsi_period,
bb_window, bb_std, ema_fast, ema_slow, vol_window,
tech_weights,
)
# Normalize hybrid score: if sentiment is absent (0.0),
# use tech alone so buy/sell thresholds remain reachable
if sent_score == 0.0:
hybrid = tech
else:
hybrid = tech_weight * tech + sent_weight * sent_score
score = hybrid
if hybrid >= buy_threshold:
action = "BUY"
elif hybrid <= sell_threshold:
action = "SELL"
else:
action = "HOLD"
reason = f"hybrid={hybrid:.3f} tech={tech:.3f}"
if self.debug:
logger.info(
"Bar %d | %s | price=%.2f | tech=%.3f | sent=%.3f | hybrid=%.3f | action=%s",
idx, current_date, current_price, tech, sent_score, hybrid, action,
)
if action == "BUY" and position_qty == 0:
qty = calculate_position_size(
cash + position_qty * position_avg_price,
current_price,
risk_pct=risk_pct,
max_position_pct=self.config.get("max_position_pct", 0.10),
)
if qty > 0 and cash >= qty * current_price:
cost = qty * current_price
cash -= cost
total_shares = position_qty + qty
position_avg_price = (
(position_avg_price * position_qty + current_price * qty) / total_shares
)
position_qty = total_shares
trades.append(BacktestTrade(
timestamp=current_date,
symbol=symbol,
action="BUY",
price=current_price,
qty=qty,
reason=reason,
))
if self.debug:
logger.info(
" >>> BUY %d @ %.2f (cost=%.2f, cash=%.2f, pos=%d)",
qty, current_price, cost, cash, position_qty,
)
elif self.debug:
logger.info(
" >>> BUY blocked: qty=%d, cash=%.2f, need=%.2f",
qty, cash, qty * current_price,
)
elif action == "SELL" and position_qty > 0:
sell_reason = reason
if check_stop_loss(position_avg_price, current_price, stop_loss_pct):
sell_reason = f"stop-loss ({reason})"
proceeds = position_qty * current_price
pnl = (current_price - position_avg_price) * position_qty
cash += proceeds
trades.append(BacktestTrade(
timestamp=current_date,
symbol=symbol,
action="SELL",
price=current_price,
qty=position_qty,
reason=sell_reason,
pnl=pnl,
))
if self.debug:
logger.info(
" >>> SELL %d @ %.2f (pnl=%.2f, proceeds=%.2f, cash=%.2f)",
position_qty, current_price, pnl, proceeds, cash,
)
position_qty = 0
position_avg_price = 0.0
# Track equity
equity = cash + position_qty * current_price
equity_curve.append(equity)
equity_values.append(equity)
# Close any remaining position at last price
if position_qty > 0 and len(df) > 0:
last_price = float(df.iloc[-1]["close"])
last_date = str(df.iloc[-1]["date"])[:10]
pnl = (last_price - position_avg_price) * position_qty
cash += position_qty * last_price
trades.append(BacktestTrade(
timestamp=last_date,
symbol=symbol,
action="SELL",
price=last_price,
qty=position_qty,
reason="end of backtest",
pnl=pnl,
))
position_qty = 0
final_equity = cash
total_return = ((final_equity - initial_capital) / initial_capital) * 100
logger.info("Backtest %s: %d trades, return=%.2f%%", symbol, len(trades), total_return)
# Compute metrics
peak = equity_values[0]
max_dd_actual = 0.0
for val in equity_values:
if val > peak:
peak = val
dd = (peak - val) / peak if peak > 0 else 0
max_dd_actual = max(max_dd_actual, dd)
# Win rate
sell_trades = [t for t in trades if t.action == "SELL"]
winning = sum(1 for t in sell_trades if t.pnl > 0)
losing = sum(1 for t in sell_trades if t.pnl < 0)
win_rate = (winning / len(sell_trades) * 100) if sell_trades else 0.0
# Sharpe ratio (daily returns)
if len(equity_values) > 1:
returns = np.diff(equity_values) / equity_values[:-1]
sharpe = (np.mean(returns) / np.std(returns) * np.sqrt(252)) if np.std(returns) > 0 else 0.0
else:
sharpe = 0.0
return BacktestResult(
symbol=symbol,
start_date=str(df.iloc[0]["date"])[:10] if len(df) > 0 else "N/A",
end_date=str(df.iloc[-1]["date"])[:10] if len(df) > 0 else "N/A",
initial_capital=initial_capital,
final_equity=final_equity,
total_return_pct=total_return,
max_drawdown_pct=max_dd_actual * 100,
sharpe_ratio=sharpe,
win_rate=win_rate,
total_trades=len(trades),
winning_trades=winning,
losing_trades=losing,
trades=trades,
equity_curve=equity_curve,
)