alpha-predict / src /processor.py
DevKX's picture
Upload 2 files
2d5897e verified
import os
import pandas as pd
import numpy as np
import torch
import joblib
import time
from datetime import datetime
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
class Processor:
def __init__(self, scaler_path="models/robust_scaler.pkl"):
print("βš™οΈ Initializing AlphaProcessor...")
self.device = 0 if torch.cuda.is_available() else -1
self.model_name = "ProsusAI/finbert"
# 1. Load Scaler
try:
self.scaler = joblib.load(scaler_path)
print(f"βœ… Scaler loaded from {scaler_path}")
except:
print("⚠️ Scaler not found in models/ folder.")
# 2. Initialize FinBERT
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(
self.model_name, use_safetensors=True
)
self.sentiment_pipe = pipeline(
"sentiment-analysis",
model=self.model,
tokenizer=self.tokenizer,
device=self.device
)
def fetch_market_data(self, days=60):
"""
Loads market data from your provided CSV backup.
Bypasses Finnhub to avoid 403 errors during presentation.
"""
print(f"πŸ“ System: Bypassing API. Loading local market data...")
backup_path = "data/market_data_backup.csv"
if not os.path.exists(backup_path):
print(f"🚨 FATAL: {backup_path} not found!")
return pd.DataFrame()
df = pd.read_csv(backup_path, index_col=0, parse_dates=True)
# Optional: Sync dates to today for presentation realism
# last_date = df.index[-1]
# offset = pd.Timestamp(datetime.now().date()) - last_date
# df.index = df.index + offset
return df.tail(days)
def process(self, df_market, df_news):
"""
Main pipeline: News Sentiment -> Feature Engineering -> GRU Input
"""
# 1. Process Sentiment from headlines
df_sent, df_news_scored = self._generate_sentiment_profile(df_news)
# 2. Merge and engineer all 14 features
df_features = self._engineer_14_features(df_market, df_sent)
# 3. Extract metadata for Streamlit UI
latest_metrics = {
"Sent_Mean": df_features['Sent_Mean'].iloc[-1],
"News_Volume": int(np.exp(df_features['News_Volume'].iloc[-1]) - 1),
"Panic_Interaction": df_features['Sent_x_VIX'].iloc[-1],
"RSI": df_features['RSI'].iloc[-1] * 100
}
# 4. Prepare 30-day window for GRU
final_window = df_features.tail(30).values
scaled_window = self.scaler.transform(final_window)
input_tensor = np.expand_dims(scaled_window, axis=0).astype('float32')
return input_tensor, latest_metrics, df_features, df_news_scored
def _generate_sentiment_profile(self, df_news):
print("🧠 Running FinBERT Batch Analysis...")
titles = df_news['Title'].astype(str).tolist()
# Batch processing to handle 1700+ headlines efficiently
results = self.sentiment_pipe(titles, batch_size=32, truncation=True)
scores = []
for res in results:
label, score = res['label'].lower(), res['score']
scores.append(score if label == 'positive' else -score if label == 'negative' else 0.0)
df_news['Score'] = scores
df_news['Date'] = pd.to_datetime(df_news['Date']).dt.date
grouped = df_news.groupby('Date')['Score']
daily = pd.DataFrame({
'Sent_Mean': grouped.mean(),
'Sent_Intensity': grouped.apply(lambda x: x.abs().mean()),
'News_Volume': np.log1p(grouped.count()),
'Net_Bull': grouped.apply(lambda x: x.sum() / (len(x) + 1))
}).fillna(0.0)
daily.index = pd.to_datetime(daily.index)
return daily, df_news
def _engineer_14_features(self, df, df_sent):
data = df.copy()
data.columns = [c.capitalize() for c in data.columns]
if 'Vix' in data.columns: data = data.rename(columns={'Vix': 'VIX'})
# --- QUANT BRANCH (7 Features) ---
tp = (data['High'] + data['Low'] + data['Close']) / 3
vwap = (tp * data['Volume']).rolling(20).sum() / (data['Volume'].rolling(20).sum() + 1e-9)
data['VWAP_Dist'] = np.log(data['Close'] / vwap)
delta = data['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
data['RSI'] = (100 - (100 / (1 + (gain/(loss + 1e-9))))) / 100.0
ema_12 = data['Close'].ewm(span=12).mean()
ema_26 = data['Close'].ewm(span=26).mean()
macd = ema_12 - ema_26
data['MACD_Hist'] = (macd - macd.ewm(span=9).mean()) / data['Close']
data['VIX_Norm'] = data['VIX'] / 100.0
data['VIX_Change'] = data['VIX'].pct_change()
tr = pd.concat([data['High']-data['Low'],
abs(data['High']-data['Close'].shift()),
abs(data['Low']-data['Close'].shift())], axis=1).max(axis=1)
data['ATR_Dist'] = np.tanh((data['Close'] - data['Close'].rolling(22).mean()) / (tr.rolling(14).mean() + 1e-9))
data['Realized_Vol'] = data['Close'].pct_change().rolling(10).std() * 10
# --- SENTIMENT BRANCH (7 Features) ---
data.index = pd.to_datetime(data.index)
data = data.join(df_sent, how='left').fillna(0.0)
data['Sent_Mean_Delta'] = data['Sent_Mean'].diff().fillna(0.0)
data['Sent_Mean_EMA'] = data['Sent_Mean'].ewm(span=3).mean()
data['Sent_x_VIX'] = data['Sent_Mean'] * data['VIX_Norm']
feature_cols = [
'VWAP_Dist', 'RSI', 'MACD_Hist', 'VIX_Norm', 'VIX_Change', 'ATR_Dist', 'Realized_Vol',
'Sent_Mean', 'Sent_Intensity', 'News_Volume', 'Net_Bull', 'Sent_Mean_Delta', 'Sent_Mean_EMA', 'Sent_x_VIX'
]
return data[feature_cols].dropna()