alpha-predict / src /data_fetcher.py
DevKX's picture
Upload data_fetcher.py
6711b85 verified
import os
import time
import yfinance as yf
import pandas as pd
import finnhub
import streamlit as st
import requests
from dotenv import load_dotenv
from datetime import datetime, timedelta
# Load environment variables
load_dotenv()
class DataFetcher:
def __init__(self, ticker="^GSPC", vix_ticker="%5EVIX"):
self.ticker = ticker
self.vix_ticker = vix_ticker
# Load API Keys
self.finnhub_key = os.getenv("FINNHUB_API_KEY")
self.fmp_key = os.getenv("FMP_API_KEY")
if not self.finnhub_key or not self.fmp_key:
print("⚠️ Warning: API Keys missing! Check your .env file or HF Secrets.")
# Initialize Finnhub Client for News
self.finnhub_client = finnhub.Client(api_key=self.finnhub_key)
def fetch_market_data(self, days=60):
"""Fetches live SPY data from the NEW FMP Stable API and merges VIX."""
if not self.fmp_key:
return self._load_backup(days)
try:
print(f"πŸ“‘ Fetching live data for {self.ticker} from FMP Stable API...")
spy_url = f"https://financialmodelingprep.com/stable/historical-price-eod/full?symbol={self.ticker}&apikey={self.fmp_key}"
spy_res = requests.get(spy_url, timeout=10).json()
if isinstance(spy_res, dict) and "Error Message" in spy_res:
print(f"🚨 FMP Error: {spy_res['Error Message']}")
return self._load_backup(days)
if not isinstance(spy_res, list) or len(spy_res) == 0:
return self._load_backup(days)
# Format main DataFrame
df = pd.DataFrame(spy_res)
# πŸ›‘οΈ THE FIX: Convert to datetime, strip timezones, and set to midnight
df['date'] = pd.to_datetime(df['date'])
if df['date'].dt.tz is not None:
df['date'] = df['date'].dt.tz_localize(None)
df['date'] = df['date'].dt.normalize()
df.set_index('date', inplace=True)
df = df.sort_index()[['open', 'high', 'low', 'close', 'volume']]
df.columns = [c.capitalize() for c in df.columns]
# Add VIX
df['VIX'] = self._get_vix_data()
df['VIX'] = df['VIX'].ffill().bfill()
print("βœ… Live market data fetched and merged successfully!")
return df.tail(days)
except Exception as e:
print(f"🚨 Major Fetch Error: {e}")
return self._load_backup(days)
def _get_vix_data(self):
"""Attempts to fetch VIX from Stable API, falls back to CSV if blocked."""
print("πŸ“‘ Attempting to fetch VIX from FMP Stable API...")
try:
vix_url = f"https://financialmodelingprep.com/stable/historical-price-eod/full?symbol={self.vix_ticker}&apikey={self.fmp_key}"
vix_res = requests.get(vix_url, timeout=5).json()
if isinstance(vix_res, list) and len(vix_res) > 0:
vix_df = pd.DataFrame(vix_res)
# πŸ›‘οΈ THE FIX: Strip timezones for VIX so it perfectly matches SPY
vix_df['date'] = pd.to_datetime(vix_df['date'])
if vix_df['date'].dt.tz is not None:
vix_df['date'] = vix_df['date'].dt.tz_localize(None)
vix_df['date'] = vix_df['date'].dt.normalize()
vix_df.set_index('date', inplace=True)
vix_df = vix_df.sort_index()
print("βœ… VIX fetched successfully from FMP!")
return vix_df['close']
except Exception as e:
print(f"⚠️ VIX API request failed: {e}")
print("⚠️ Pulling VIX from local backup...")
backup_path = "data/market_data_backup.csv"
if os.path.exists(backup_path):
backup_df = pd.read_csv(backup_path, index_col=0, parse_dates=True)
# Strip timezones from the backup CSV index as well!
if backup_df.index.tz is not None:
backup_df.index = backup_df.index.tz_localize(None)
backup_df.index = backup_df.index.normalize()
if 'VIX' in backup_df.columns:
return backup_df['VIX']
return 18.0
def _load_backup(self, days):
"""Failsafe method to load local CSV if API entirely blocks the request."""
print(f"πŸ“ System: Loading localized market data backup...")
backup_path = "data/market_data_backup.csv"
if not os.path.exists(backup_path):
print("🚨 Market backup CSV not found!")
return pd.DataFrame()
df = pd.read_csv(backup_path, index_col=0, parse_dates=True)
return df.tail(days)
# def fetch_market_data(self, days=50):
# """
# Fetches market data using Finnhub (SPY as proxy) with a CSV fallback.
# """
# print(f"πŸ“‘ Attempting to fetch last {days} days from Finnhub (using SPY proxy)...")
# try:
# # 1. Setup Timestamps (Finnhub needs Unix seconds)
# end_ts = int(time.time())
# start_ts = int((datetime.now() - timedelta(days=days+10)).timestamp())
# # 2. Fetch SPY (S&P 500 Proxy)
# # '1' means daily candles
# res = self.finnhub_client.stock_candles('SPY', 'D', start_ts, end_ts)
# if res.get('s') != 'ok':
# raise ValueError(f"Finnhub API returned status: {res.get('s')}")
# # Convert Finnhub response to DataFrame
# df = pd.DataFrame({
# 'Date': pd.to_datetime(res['t'], unit='s'),
# 'Close': res['c'],
# 'Open': res['o'],
# 'High': res['h'],
# 'Low': res['l'],
# 'Volume': res['v']
# }).set_index('Date')
# # 3. Handle VIX (Finnhub free tier often blocks ^VIX)
# # We attempt it, but if it fails, we merge from our backup data
# try:
# vix_res = self.finnhub_client.stock_candles('VIX', 'D', start_ts, end_ts)
# if vix_res.get('s') == 'ok':
# df['VIX'] = vix_res['c']
# else:
# raise Exception("VIX not available on API")
# except:
# print("⚠️ VIX not available on Finnhub. Pulling VIX from backup...")
# backup_df = pd.read_csv("data/market_data_backup.csv", index_col=0, parse_dates=True)
# # Reindex backup to match the dates we just got from the API
# df['VIX'] = backup_df['VIX'].reindex(df.index, method='ffill')
# # Final cleanup
# df = df.ffill().dropna()
# if df.empty:
# raise ValueError("Resulting DataFrame is empty.")
# return df
# except Exception as e:
# print(f"⚠️ Finnhub fetch failed ({e}). Loading full backup from data/ folder...")
# backup_path = "data/market_data_backup.csv"
# if not os.path.exists(backup_path):
# print(f"🚨 FATAL: {backup_path} not found!")
# return pd.DataFrame() # This will trigger your safety check in Processor
# df_backup = pd.read_csv(backup_path, index_col=0, parse_dates=True)
# return df_backup.tail(days)
# πŸ›‘οΈ STREAMLIT CACHE: Ignores '_self' so it doesn't try to hash the Finnhub client.
# ttl=3600 caches the news for 1 hour so repeated button clicks load instantly.
@st.cache_data(ttl=3600, show_spinner=False)
def fetch_market_news(_self, days=45):
"""
Fetches historical market news by looping through days.
Uses 'SPY' as a proxy to allow historical date filtering on Finnhub.
"""
print(f"πŸ“° Fetching last {days} days of market headlines...")
all_news = []
end_date = datetime.now()
# Try to render a Streamlit progress bar if running inside app.py
try:
progress_bar = st.progress(0, text="Fetching historical news data (avoiding rate limits)...")
except:
progress_bar = None
# Loop backwards through time, day by day
for i in range(days):
target_date = end_date - timedelta(days=i)
date_str = target_date.strftime('%Y-%m-%d')
try:
# FINNHUB TRICK: Use 'SPY' company news to get historical market coverage
daily_news = _self.finnhub_client.company_news('SPY', _from=date_str, to=date_str)
if daily_news:
all_news.extend(daily_news)
# πŸ›‘ RATE LIMIT SHIELD: Finnhub free tier allows 60 requests/minute.
# Sleeping for 1.1 seconds guarantees we stay perfectly under the limit.
time.sleep(1.1)
except Exception as e:
print(f"⚠️ API Error on {date_str}: {e}")
time.sleep(5) # Take a longer pause if the API gets angry
# Update UI progress
if progress_bar:
progress_bar.progress((i + 1) / days, text=f"Fetched news for {date_str}...")
# Clear the progress bar when finished
if progress_bar:
progress_bar.empty()
# Convert the master list into a DataFrame
df_news = pd.DataFrame(all_news)
if df_news.empty:
print("⚠️ No news found in the specified window.")
return pd.DataFrame(columns=['Title', 'Date'])
# Convert Unix timestamp to YYYY-MM-DD Date object
df_news['Date'] = pd.to_datetime(df_news['datetime'], unit='s').dt.date
# Rename columns to match what Processor expects
df_news = df_news[['headline', 'Date']].rename(columns={'headline': 'Title'})
# Drop duplicates in case of overlapping API returns
df_news = df_news.drop_duplicates(subset=['Title', 'Date'])
print(f"βœ… Successfully fetched {len(df_news)} historical headlines.")
return df_news
if __name__ == "__main__":
fetcher = DataFetcher()
# Test Market Fetch
market_df = fetcher.fetch_market_data(days=50)
print("\n--- Market Data Sample ---")
print(market_df.tail())
# Test News Fetch
news_df = fetcher.fetch_market_news(days=45)
print("\n--- Market News Sample ---")
print(news_df.head())
print(news_df.tail())
print(f"\nTotal Headlines Fetched: {len(news_df)}")