| """
|
| Database Query Functions for Cached Market Data
|
| Provides REAL data access from cached_market_data and cached_ohlc tables
|
|
|
| CRITICAL RULES:
|
| - ONLY read from database - NEVER generate fake data
|
| - Return empty list if no data found
|
| - All queries must be REAL database operations
|
| """
|
|
|
| import logging
|
| from datetime import datetime, timedelta
|
| from typing import Optional, List, Dict, Any
|
| from sqlalchemy import desc, and_, func
|
| from sqlalchemy.orm import Session
|
|
|
| from database.models import CachedMarketData, CachedOHLC
|
| from database.db_manager import DatabaseManager
|
| from utils.logger import setup_logger
|
|
|
| logger = setup_logger("cache_queries")
|
|
|
|
|
| class CacheQueries:
|
| """
|
| Database query operations for cached market data
|
|
|
| CRITICAL: All methods return REAL data from database ONLY
|
| """
|
|
|
| def __init__(self, db_manager: DatabaseManager):
|
| self.db = db_manager
|
|
|
| def get_cached_market_data(
|
| self,
|
| symbols: Optional[List[str]] = None,
|
| limit: int = 100
|
| ) -> List[Dict[str, Any]]:
|
| """
|
| Get cached market data from database
|
|
|
| CRITICAL RULES:
|
| - ONLY read from cached_market_data table
|
| - NEVER generate or fake data
|
| - Return empty list if no data found
|
| - Use DISTINCT ON to get latest data per symbol
|
|
|
| Args:
|
| symbols: List of symbols to filter (e.g., ['BTC', 'ETH'])
|
| limit: Maximum number of records
|
|
|
| Returns:
|
| List of dictionaries with REAL market data from database
|
| """
|
| try:
|
| with self.db.get_session() as session:
|
|
|
| subq = session.query(
|
| CachedMarketData.symbol,
|
| func.max(CachedMarketData.fetched_at).label('max_fetched_at')
|
| ).group_by(CachedMarketData.symbol)
|
|
|
| if symbols:
|
| subq = subq.filter(CachedMarketData.symbol.in_(symbols))
|
|
|
| subq = subq.subquery()
|
|
|
|
|
| query = session.query(CachedMarketData).join(
|
| subq,
|
| and_(
|
| CachedMarketData.symbol == subq.c.symbol,
|
| CachedMarketData.fetched_at == subq.c.max_fetched_at
|
| )
|
| ).order_by(desc(CachedMarketData.fetched_at)).limit(limit)
|
|
|
| results = query.all()
|
|
|
| if not results:
|
| logger.info(f"No cached market data found for symbols={symbols}")
|
| return []
|
|
|
|
|
| data = []
|
| for row in results:
|
| data.append({
|
| "symbol": row.symbol,
|
| "price": float(row.price),
|
| "market_cap": float(row.market_cap) if row.market_cap else None,
|
| "volume_24h": float(row.volume_24h) if row.volume_24h else None,
|
| "change_24h": float(row.change_24h) if row.change_24h else None,
|
| "high_24h": float(row.high_24h) if row.high_24h else None,
|
| "low_24h": float(row.low_24h) if row.low_24h else None,
|
| "provider": row.provider,
|
| "fetched_at": row.fetched_at
|
| })
|
|
|
| logger.info(f"Retrieved {len(data)} cached market records")
|
| return data
|
|
|
| except Exception as e:
|
| logger.error(f"Database error in get_cached_market_data: {e}", exc_info=True)
|
|
|
| return []
|
|
|
| def get_cached_ohlc(
|
| self,
|
| symbol: str,
|
| interval: str = "1h",
|
| limit: int = 1000
|
| ) -> List[Dict[str, Any]]:
|
| """
|
| Get cached OHLC data from database
|
|
|
| CRITICAL RULES:
|
| - ONLY read from cached_ohlc table
|
| - NEVER generate fake candles
|
| - Return empty list if no data found
|
| - Order by timestamp ASC for chart display
|
|
|
| Args:
|
| symbol: Trading pair symbol (e.g., 'BTCUSDT')
|
| interval: Candle interval (e.g., '1h', '4h', '1d')
|
| limit: Maximum number of candles
|
|
|
| Returns:
|
| List of dictionaries with REAL OHLC data from database
|
| """
|
| try:
|
| with self.db.get_session() as session:
|
|
|
| query = session.query(CachedOHLC).filter(
|
| and_(
|
| CachedOHLC.symbol == symbol,
|
| CachedOHLC.interval == interval
|
| )
|
| ).order_by(desc(CachedOHLC.timestamp)).limit(limit)
|
|
|
| results = query.all()
|
|
|
| if not results:
|
| logger.info(f"No cached OHLC data found for {symbol} {interval}")
|
| return []
|
|
|
|
|
|
|
| data = []
|
| for row in reversed(results):
|
| data.append({
|
| "timestamp": row.timestamp,
|
| "open": float(row.open),
|
| "high": float(row.high),
|
| "low": float(row.low),
|
| "close": float(row.close),
|
| "volume": float(row.volume),
|
| "provider": row.provider,
|
| "fetched_at": row.fetched_at
|
| })
|
|
|
| logger.info(f"Retrieved {len(data)} OHLC candles for {symbol} {interval}")
|
| return data
|
|
|
| except Exception as e:
|
| logger.error(f"Database error in get_cached_ohlc: {e}", exc_info=True)
|
|
|
| return []
|
|
|
| def save_market_data(
|
| self,
|
| symbol: str,
|
| price: float,
|
| market_cap: Optional[float] = None,
|
| volume_24h: Optional[float] = None,
|
| change_24h: Optional[float] = None,
|
| high_24h: Optional[float] = None,
|
| low_24h: Optional[float] = None,
|
| provider: str = "unknown"
|
| ) -> bool:
|
| """
|
| Save market data to cache
|
|
|
| CRITICAL: Only used by background workers to store REAL API data
|
|
|
| Args:
|
| symbol: Crypto symbol
|
| price: Current price (REAL from API)
|
| market_cap: Market cap (REAL from API)
|
| volume_24h: 24h volume (REAL from API)
|
| change_24h: 24h change (REAL from API)
|
| high_24h: 24h high (REAL from API)
|
| low_24h: 24h low (REAL from API)
|
| provider: Data provider name
|
|
|
| Returns:
|
| bool: True if saved successfully
|
| """
|
| try:
|
| with self.db.get_session() as session:
|
|
|
| record = CachedMarketData(
|
| symbol=symbol,
|
| price=price,
|
| market_cap=market_cap,
|
| volume_24h=volume_24h,
|
| change_24h=change_24h,
|
| high_24h=high_24h,
|
| low_24h=low_24h,
|
| provider=provider,
|
| fetched_at=datetime.utcnow()
|
| )
|
|
|
| session.add(record)
|
| session.commit()
|
|
|
| logger.info(f"Saved market data for {symbol} from {provider}")
|
| return True
|
|
|
| except Exception as e:
|
| logger.error(f"Error saving market data for {symbol}: {e}", exc_info=True)
|
| return False
|
|
|
| def save_ohlc_candle(
|
| self,
|
| symbol: str,
|
| interval: str,
|
| timestamp: datetime,
|
| open_price: float,
|
| high: float,
|
| low: float,
|
| close: float,
|
| volume: float,
|
| provider: str = "unknown"
|
| ) -> bool:
|
| """
|
| Save OHLC candle to cache
|
|
|
| CRITICAL: Only used by background workers to store REAL candle data
|
|
|
| Args:
|
| symbol: Trading pair symbol
|
| interval: Candle interval
|
| timestamp: Candle open time (REAL from API)
|
| open_price: Open price (REAL from API)
|
| high: High price (REAL from API)
|
| low: Low price (REAL from API)
|
| close: Close price (REAL from API)
|
| volume: Volume (REAL from API)
|
| provider: Data provider name
|
|
|
| Returns:
|
| bool: True if saved successfully
|
| """
|
| try:
|
| with self.db.get_session() as session:
|
|
|
| existing = session.query(CachedOHLC).filter(
|
| and_(
|
| CachedOHLC.symbol == symbol,
|
| CachedOHLC.interval == interval,
|
| CachedOHLC.timestamp == timestamp
|
| )
|
| ).first()
|
|
|
| if existing:
|
|
|
| existing.open = open_price
|
| existing.high = high
|
| existing.low = low
|
| existing.close = close
|
| existing.volume = volume
|
| existing.provider = provider
|
| existing.fetched_at = datetime.utcnow()
|
| else:
|
|
|
| record = CachedOHLC(
|
| symbol=symbol,
|
| interval=interval,
|
| timestamp=timestamp,
|
| open=open_price,
|
| high=high,
|
| low=low,
|
| close=close,
|
| volume=volume,
|
| provider=provider,
|
| fetched_at=datetime.utcnow()
|
| )
|
| session.add(record)
|
|
|
| session.commit()
|
|
|
| logger.debug(f"Saved OHLC candle for {symbol} {interval} at {timestamp}")
|
| return True
|
|
|
| except Exception as e:
|
| logger.error(f"Error saving OHLC candle for {symbol}: {e}", exc_info=True)
|
| return False
|
|
|
| def cleanup_old_data(self, days: int = 7) -> Dict[str, int]:
|
| """
|
| Remove old cached data to manage storage
|
|
|
| Args:
|
| days: Remove data older than N days
|
|
|
| Returns:
|
| Dictionary with counts of deleted records
|
| """
|
| try:
|
| with self.db.get_session() as session:
|
| cutoff_time = datetime.utcnow() - timedelta(days=days)
|
| deleted_counts = {}
|
|
|
|
|
| deleted = session.query(CachedMarketData).filter(
|
| CachedMarketData.fetched_at < cutoff_time
|
| ).delete()
|
| deleted_counts['market_data'] = deleted
|
|
|
|
|
| deleted = session.query(CachedOHLC).filter(
|
| CachedOHLC.fetched_at < cutoff_time
|
| ).delete()
|
| deleted_counts['ohlc'] = deleted
|
|
|
| session.commit()
|
|
|
| total_deleted = sum(deleted_counts.values())
|
| logger.info(f"Cleaned up {total_deleted} old cache records (older than {days} days)")
|
|
|
| return deleted_counts
|
|
|
| except Exception as e:
|
| logger.error(f"Error cleaning up old data: {e}", exc_info=True)
|
| return {}
|
|
|
|
|
|
|
| _cache_queries = None
|
|
|
| def get_cache_queries(db_manager: Optional[DatabaseManager] = None) -> CacheQueries:
|
| """
|
| Get global CacheQueries instance
|
|
|
| Args:
|
| db_manager: DatabaseManager instance (optional, will use global if not provided)
|
|
|
| Returns:
|
| CacheQueries instance
|
| """
|
| global _cache_queries
|
|
|
| if _cache_queries is None:
|
| if db_manager is None:
|
| from database.db_manager import db_manager as global_db
|
| db_manager = global_db
|
| _cache_queries = CacheQueries(db_manager)
|
|
|
| return _cache_queries
|
|
|