| """
|
| Data Access API Endpoints
|
| Provides user-facing endpoints to access collected cryptocurrency data
|
| """
|
|
|
| from datetime import datetime, timedelta
|
| from typing import Optional, List
|
| from fastapi import APIRouter, HTTPException, Query
|
| from pydantic import BaseModel
|
|
|
| from database.db_manager import db_manager
|
| from utils.logger import setup_logger
|
|
|
| logger = setup_logger("data_endpoints")
|
|
|
| router = APIRouter(prefix="/api/crypto", tags=["data"])
|
|
|
|
|
|
|
|
|
|
|
|
|
| class PriceData(BaseModel):
|
| """Price data model"""
|
| symbol: str
|
| price_usd: float
|
| market_cap: Optional[float] = None
|
| volume_24h: Optional[float] = None
|
| price_change_24h: Optional[float] = None
|
| timestamp: datetime
|
| source: str
|
|
|
|
|
| class NewsArticle(BaseModel):
|
| """News article model"""
|
| id: int
|
| title: str
|
| content: Optional[str] = None
|
| source: str
|
| url: Optional[str] = None
|
| published_at: datetime
|
| sentiment: Optional[str] = None
|
| tags: Optional[List[str]] = None
|
|
|
|
|
| class WhaleTransaction(BaseModel):
|
| """Whale transaction model"""
|
| id: int
|
| blockchain: str
|
| transaction_hash: str
|
| from_address: str
|
| to_address: str
|
| amount: float
|
| amount_usd: float
|
| timestamp: datetime
|
| source: str
|
|
|
|
|
| class SentimentMetric(BaseModel):
|
| """Sentiment metric model"""
|
| metric_name: str
|
| value: float
|
| classification: str
|
| timestamp: datetime
|
| source: str
|
|
|
|
|
|
|
|
|
|
|
|
|
| @router.get("/prices", response_model=List[PriceData])
|
| async def get_all_prices(
|
| limit: int = Query(default=100, ge=1, le=1000, description="Number of records to return")
|
| ):
|
| """
|
| Get latest prices for all cryptocurrencies
|
|
|
| Returns the most recent price data for all tracked cryptocurrencies
|
| """
|
| try:
|
| prices = db_manager.get_latest_prices(limit=limit)
|
|
|
| if not prices:
|
| return []
|
|
|
| return [
|
| PriceData(
|
| symbol=p.symbol,
|
| price_usd=p.price_usd,
|
| market_cap=p.market_cap,
|
| volume_24h=p.volume_24h,
|
| price_change_24h=p.price_change_24h,
|
| timestamp=p.timestamp,
|
| source=p.source
|
| )
|
| for p in prices
|
| ]
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting prices: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get prices: {str(e)}")
|
|
|
|
|
| @router.get("/prices/{symbol}", response_model=PriceData)
|
| async def get_price_by_symbol(symbol: str):
|
| """
|
| Get latest price for a specific cryptocurrency
|
|
|
| Args:
|
| symbol: Cryptocurrency symbol (e.g., BTC, ETH, BNB)
|
| """
|
| try:
|
| symbol = symbol.upper()
|
| price = db_manager.get_latest_price_by_symbol(symbol)
|
|
|
| if not price:
|
| raise HTTPException(status_code=404, detail=f"Price data not found for {symbol}")
|
|
|
| return PriceData(
|
| symbol=price.symbol,
|
| price_usd=price.price_usd,
|
| market_cap=price.market_cap,
|
| volume_24h=price.volume_24h,
|
| price_change_24h=price.price_change_24h,
|
| timestamp=price.timestamp,
|
| source=price.source
|
| )
|
|
|
| except HTTPException:
|
| raise
|
| except Exception as e:
|
| logger.error(f"Error getting price for {symbol}: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get price: {str(e)}")
|
|
|
|
|
| @router.get("/history/{symbol}")
|
| async def get_price_history(
|
| symbol: str,
|
| hours: int = Query(default=24, ge=1, le=720, description="Number of hours of history"),
|
| interval: int = Query(default=60, ge=1, le=1440, description="Interval in minutes")
|
| ):
|
| """
|
| Get price history for a cryptocurrency
|
|
|
| Args:
|
| symbol: Cryptocurrency symbol
|
| hours: Number of hours of history to return
|
| interval: Data point interval in minutes
|
| """
|
| try:
|
| symbol = symbol.upper()
|
| history = db_manager.get_price_history(symbol, hours=hours)
|
|
|
| if not history:
|
| raise HTTPException(status_code=404, detail=f"No history found for {symbol}")
|
|
|
|
|
| sampled = []
|
| last_time = None
|
|
|
| for record in history:
|
| if last_time is None or (record.timestamp - last_time).total_seconds() >= interval * 60:
|
| sampled.append({
|
| "timestamp": record.timestamp.isoformat(),
|
| "price_usd": record.price_usd,
|
| "volume_24h": record.volume_24h,
|
| "market_cap": record.market_cap
|
| })
|
| last_time = record.timestamp
|
|
|
| return {
|
| "symbol": symbol,
|
| "data_points": len(sampled),
|
| "interval_minutes": interval,
|
| "history": sampled
|
| }
|
|
|
| except HTTPException:
|
| raise
|
| except Exception as e:
|
| logger.error(f"Error getting history for {symbol}: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get history: {str(e)}")
|
|
|
|
|
| @router.get("/market-overview")
|
| async def get_market_overview():
|
| """
|
| Get market overview with top cryptocurrencies
|
| """
|
| try:
|
| prices = db_manager.get_latest_prices(limit=20)
|
|
|
| if not prices:
|
| return {
|
| "total_market_cap": 0,
|
| "total_volume_24h": 0,
|
| "top_gainers": [],
|
| "top_losers": [],
|
| "top_by_market_cap": []
|
| }
|
|
|
|
|
| total_market_cap = sum(p.market_cap for p in prices if p.market_cap)
|
| total_volume_24h = sum(p.volume_24h for p in prices if p.volume_24h)
|
|
|
|
|
| sorted_by_change = sorted(
|
| [p for p in prices if p.price_change_24h is not None],
|
| key=lambda x: x.price_change_24h,
|
| reverse=True
|
| )
|
|
|
|
|
| sorted_by_mcap = sorted(
|
| [p for p in prices if p.market_cap is not None],
|
| key=lambda x: x.market_cap,
|
| reverse=True
|
| )
|
|
|
| return {
|
| "total_market_cap": total_market_cap,
|
| "total_volume_24h": total_volume_24h,
|
| "top_gainers": [
|
| {
|
| "symbol": p.symbol,
|
| "price_usd": p.price_usd,
|
| "price_change_24h": p.price_change_24h
|
| }
|
| for p in sorted_by_change[:5]
|
| ],
|
| "top_losers": [
|
| {
|
| "symbol": p.symbol,
|
| "price_usd": p.price_usd,
|
| "price_change_24h": p.price_change_24h
|
| }
|
| for p in sorted_by_change[-5:]
|
| ],
|
| "top_by_market_cap": [
|
| {
|
| "symbol": p.symbol,
|
| "price_usd": p.price_usd,
|
| "market_cap": p.market_cap,
|
| "volume_24h": p.volume_24h
|
| }
|
| for p in sorted_by_mcap[:10]
|
| ],
|
| "timestamp": datetime.utcnow().isoformat()
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting market overview: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get market overview: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| @router.get("/news", response_model=List[NewsArticle])
|
| async def get_latest_news(
|
| limit: int = Query(default=50, ge=1, le=200, description="Number of articles"),
|
| source: Optional[str] = Query(default=None, description="Filter by source"),
|
| sentiment: Optional[str] = Query(default=None, description="Filter by sentiment")
|
| ):
|
| """
|
| Get latest cryptocurrency news
|
|
|
| Args:
|
| limit: Maximum number of articles to return
|
| source: Filter by news source
|
| sentiment: Filter by sentiment (positive, negative, neutral)
|
| """
|
| try:
|
| news = db_manager.get_latest_news(
|
| limit=limit,
|
| source=source,
|
| sentiment=sentiment
|
| )
|
|
|
| if not news:
|
| return []
|
|
|
| return [
|
| NewsArticle(
|
| id=article.id,
|
| title=article.title,
|
| content=article.content,
|
| source=article.source,
|
| url=article.url,
|
| published_at=article.published_at,
|
| sentiment=article.sentiment,
|
| tags=article.tags.split(',') if article.tags else None
|
| )
|
| for article in news
|
| ]
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting news: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get news: {str(e)}")
|
|
|
|
|
| @router.get("/news/{news_id}", response_model=NewsArticle)
|
| async def get_news_by_id(news_id: int):
|
| """
|
| Get a specific news article by ID
|
| """
|
| try:
|
| article = db_manager.get_news_by_id(news_id)
|
|
|
| if not article:
|
| raise HTTPException(status_code=404, detail=f"News article {news_id} not found")
|
|
|
| return NewsArticle(
|
| id=article.id,
|
| title=article.title,
|
| content=article.content,
|
| source=article.source,
|
| url=article.url,
|
| published_at=article.published_at,
|
| sentiment=article.sentiment,
|
| tags=article.tags.split(',') if article.tags else None
|
| )
|
|
|
| except HTTPException:
|
| raise
|
| except Exception as e:
|
| logger.error(f"Error getting news {news_id}: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get news: {str(e)}")
|
|
|
|
|
| @router.get("/news/search")
|
| async def search_news(
|
| q: str = Query(..., min_length=2, description="Search query"),
|
| limit: int = Query(default=50, ge=1, le=200)
|
| ):
|
| """
|
| Search news articles by keyword
|
|
|
| Args:
|
| q: Search query
|
| limit: Maximum number of results
|
| """
|
| try:
|
| results = db_manager.search_news(query=q, limit=limit)
|
|
|
| return {
|
| "query": q,
|
| "count": len(results),
|
| "results": [
|
| {
|
| "id": article.id,
|
| "title": article.title,
|
| "source": article.source,
|
| "url": article.url,
|
| "published_at": article.published_at.isoformat(),
|
| "sentiment": article.sentiment
|
| }
|
| for article in results
|
| ]
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Error searching news: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to search news: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| @router.get("/sentiment/current")
|
| async def get_current_sentiment():
|
| """
|
| Get current market sentiment metrics
|
| """
|
| try:
|
| sentiment = db_manager.get_latest_sentiment()
|
|
|
| if not sentiment:
|
| return {
|
| "fear_greed_index": None,
|
| "classification": "unknown",
|
| "timestamp": None,
|
| "message": "No sentiment data available"
|
| }
|
|
|
| return {
|
| "fear_greed_index": sentiment.value,
|
| "classification": sentiment.classification,
|
| "timestamp": sentiment.timestamp.isoformat(),
|
| "source": sentiment.source,
|
| "description": _get_sentiment_description(sentiment.classification)
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting sentiment: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get sentiment: {str(e)}")
|
|
|
|
|
| @router.get("/sentiment/history")
|
| async def get_sentiment_history(
|
| hours: int = Query(default=168, ge=1, le=720, description="Hours of history (default: 7 days)")
|
| ):
|
| """
|
| Get sentiment history
|
| """
|
| try:
|
| history = db_manager.get_sentiment_history(hours=hours)
|
|
|
| return {
|
| "data_points": len(history),
|
| "history": [
|
| {
|
| "timestamp": record.timestamp.isoformat(),
|
| "value": record.value,
|
| "classification": record.classification
|
| }
|
| for record in history
|
| ]
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting sentiment history: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get sentiment history: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| @router.get("/whales/transactions", response_model=List[WhaleTransaction])
|
| async def get_whale_transactions(
|
| limit: int = Query(default=50, ge=1, le=200),
|
| blockchain: Optional[str] = Query(default=None, description="Filter by blockchain"),
|
| min_amount_usd: Optional[float] = Query(default=None, ge=0, description="Minimum transaction amount in USD")
|
| ):
|
| """
|
| Get recent large cryptocurrency transactions (whale movements)
|
|
|
| Args:
|
| limit: Maximum number of transactions
|
| blockchain: Filter by blockchain (ethereum, bitcoin, etc.)
|
| min_amount_usd: Minimum transaction amount in USD
|
| """
|
| try:
|
| transactions = db_manager.get_whale_transactions(
|
| limit=limit,
|
| blockchain=blockchain,
|
| min_amount_usd=min_amount_usd
|
| )
|
|
|
| if not transactions:
|
| return []
|
|
|
| return [
|
| WhaleTransaction(
|
| id=tx.id,
|
| blockchain=tx.blockchain,
|
| transaction_hash=tx.transaction_hash,
|
| from_address=tx.from_address,
|
| to_address=tx.to_address,
|
| amount=tx.amount,
|
| amount_usd=tx.amount_usd,
|
| timestamp=tx.timestamp,
|
| source=tx.source
|
| )
|
| for tx in transactions
|
| ]
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting whale transactions: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get whale transactions: {str(e)}")
|
|
|
|
|
| @router.get("/whales/stats")
|
| async def get_whale_stats(
|
| hours: int = Query(default=24, ge=1, le=168, description="Time period in hours")
|
| ):
|
| """
|
| Get whale activity statistics
|
| """
|
| try:
|
| stats = db_manager.get_whale_stats(hours=hours)
|
|
|
| return {
|
| "period_hours": hours,
|
| "total_transactions": stats.get('total_transactions', 0),
|
| "total_volume_usd": stats.get('total_volume_usd', 0),
|
| "avg_transaction_usd": stats.get('avg_transaction_usd', 0),
|
| "largest_transaction_usd": stats.get('largest_transaction_usd', 0),
|
| "by_blockchain": stats.get('by_blockchain', {}),
|
| "timestamp": datetime.utcnow().isoformat()
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting whale stats: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get whale stats: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| @router.get("/blockchain/gas")
|
| async def get_gas_prices():
|
| """
|
| Get current gas prices for various blockchains
|
| """
|
| try:
|
| gas_prices = db_manager.get_latest_gas_prices()
|
|
|
| return {
|
| "ethereum": gas_prices.get('ethereum', {}),
|
| "bsc": gas_prices.get('bsc', {}),
|
| "polygon": gas_prices.get('polygon', {}),
|
| "timestamp": datetime.utcnow().isoformat()
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting gas prices: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get gas prices: {str(e)}")
|
|
|
|
|
| @router.get("/blockchain/stats")
|
| async def get_blockchain_stats():
|
| """
|
| Get blockchain statistics
|
| """
|
| try:
|
| stats = db_manager.get_blockchain_stats()
|
|
|
| return {
|
| "ethereum": stats.get('ethereum', {}),
|
| "bitcoin": stats.get('bitcoin', {}),
|
| "bsc": stats.get('bsc', {}),
|
| "timestamp": datetime.utcnow().isoformat()
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting blockchain stats: {e}", exc_info=True)
|
| raise HTTPException(status_code=500, detail=f"Failed to get blockchain stats: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| def _get_sentiment_description(classification: str) -> str:
|
| """Get human-readable description for sentiment classification"""
|
| descriptions = {
|
| "extreme_fear": "Extreme Fear - Investors are very worried",
|
| "fear": "Fear - Investors are concerned",
|
| "neutral": "Neutral - Market is balanced",
|
| "greed": "Greed - Investors are getting greedy",
|
| "extreme_greed": "Extreme Greed - Market may be overheated"
|
| }
|
| return descriptions.get(classification, "Unknown sentiment")
|
|
|
|
|