|
|
| """
|
| Comprehensive Data Worker - Collect ALL Data from ALL Sources
|
| Uses all resources from crypto_resources_unified_2025-11-11.json
|
|
|
| This worker ensures ZERO data sources are left unused:
|
| - 23 Market Data APIs
|
| - 15 News APIs
|
| - 12 Sentiment APIs
|
| - 13 On-chain Analytics APIs
|
| - 9 Whale Tracking APIs
|
| - 18 Block Explorers
|
| - 1 Community Sentiment API
|
| - 24 RPC Nodes
|
| - 7 HuggingFace Resources
|
| - 13 Free HTTP Endpoints
|
|
|
| ALL data is uploaded to HuggingFace Datasets
|
| """
|
|
|
| import asyncio
|
| import time
|
| import logging
|
| import os
|
| from datetime import datetime, timedelta
|
| from typing import List, Dict, Any, Optional
|
| import httpx
|
|
|
| from database.cache_queries import get_cache_queries
|
| from database.db_manager import db_manager
|
| from utils.logger import setup_logger
|
| from unified_resource_loader import get_loader
|
|
|
| logger = setup_logger("comprehensive_worker")
|
|
|
|
|
| resource_loader = get_loader()
|
| cache = get_cache_queries(db_manager)
|
|
|
|
|
| HF_UPLOAD_ENABLED = bool(os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN"))
|
| if HF_UPLOAD_ENABLED:
|
| try:
|
| from hf_dataset_uploader import get_dataset_uploader
|
| hf_uploader = get_dataset_uploader()
|
| logger.info("โ
HuggingFace Dataset upload ENABLED for comprehensive worker")
|
| except Exception as e:
|
| logger.warning(f"HuggingFace Dataset upload disabled: {e}")
|
| HF_UPLOAD_ENABLED = False
|
| hf_uploader = None
|
| else:
|
| logger.info("โน๏ธ HuggingFace Dataset upload DISABLED (no HF_TOKEN)")
|
| hf_uploader = None
|
|
|
|
|
|
|
|
|
|
|
|
|
| async def fetch_news_from_cryptopanic() -> List[Dict[str, Any]]:
|
| """Fetch news from CryptoPanic (FREE, no API key)"""
|
| try:
|
| url = "https://cryptopanic.com/api/v1/posts/"
|
| params = {"auth_token": "free", "public": "true", "kind": "news", "filter": "rising"}
|
|
|
| async with httpx.AsyncClient(timeout=10.0) as client:
|
| response = await client.get(url, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| news_items = []
|
| for post in data.get("results", [])[:15]:
|
| news_items.append({
|
| "title": post.get("title", ""),
|
| "description": post.get("title", ""),
|
| "url": post.get("url", ""),
|
| "published_at": post.get("created_at", ""),
|
| "source": "CryptoPanic",
|
| "source_id": "cryptopanic",
|
| "category": "news",
|
| "fetched_at": datetime.utcnow().isoformat() + "Z"
|
| })
|
|
|
| logger.info(f"โ
CryptoPanic: {len(news_items)} articles")
|
| return news_items
|
| except Exception as e:
|
| logger.debug(f"CryptoPanic error: {e}")
|
| return []
|
|
|
|
|
| async def fetch_news_from_coinstats() -> List[Dict[str, Any]]:
|
| """Fetch news from CoinStats (FREE, no API key)"""
|
| try:
|
| url = "https://api.coin-stats.com/v2/news"
|
| params = {"limit": 20}
|
|
|
| async with httpx.AsyncClient(timeout=10.0) as client:
|
| response = await client.get(url, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| news_items = []
|
| for article in data.get("news", [])[:15]:
|
| news_items.append({
|
| "title": article.get("title", ""),
|
| "description": article.get("description", ""),
|
| "url": article.get("link", ""),
|
| "published_at": article.get("published", ""),
|
| "source": "CoinStats",
|
| "source_id": "coinstats",
|
| "category": "news",
|
| "fetched_at": datetime.utcnow().isoformat() + "Z"
|
| })
|
|
|
| logger.info(f"โ
CoinStats: {len(news_items)} articles")
|
| return news_items
|
| except Exception as e:
|
| logger.debug(f"CoinStats error: {e}")
|
| return []
|
|
|
|
|
| async def fetch_news_data() -> List[Dict[str, Any]]:
|
| """
|
| Fetch news from multiple free sources
|
|
|
| Sources:
|
| - CryptoPanic (FREE, no API key)
|
| - CoinStats (FREE, no API key)
|
| - Other news sources from registry
|
| """
|
| news_data = []
|
|
|
|
|
| try:
|
| cryptopanic_news = await fetch_news_from_cryptopanic()
|
| news_data.extend(cryptopanic_news)
|
| except Exception as e:
|
| logger.debug(f"Error fetching CryptoPanic: {e}")
|
|
|
| try:
|
| coinstats_news = await fetch_news_from_coinstats()
|
| news_data.extend(coinstats_news)
|
| except Exception as e:
|
| logger.debug(f"Error fetching CoinStats: {e}")
|
|
|
|
|
| news_resources = resource_loader.get_resources_by_category("news")
|
| logger.info(f"๐ฐ Fetching news from {len(news_resources)} additional sources...")
|
|
|
| for resource in news_resources:
|
| try:
|
|
|
| if resource.auth_type != "none" and not resource.api_key:
|
| logger.debug(f"Skipping {resource.name} (no API key)")
|
| continue
|
|
|
|
|
| url = resource.base_url
|
| headers = {}
|
| params = {}
|
|
|
|
|
| if resource.auth_type == "apiKeyHeader" and resource.api_key:
|
| headers["Authorization"] = f"Bearer {resource.api_key}"
|
| elif resource.auth_type == "apiKeyQuery" and resource.api_key:
|
| params["apiKey"] = resource.api_key
|
|
|
|
|
| if "newsapi" in resource.id:
|
|
|
| if not resource.api_key or resource.api_key.startswith("pub_"):
|
| logger.debug(f"Skipping {resource.name} (invalid API key)")
|
| continue
|
| url = f"{resource.base_url}/everything"
|
| params.update({
|
| "q": "cryptocurrency OR bitcoin OR ethereum",
|
| "language": "en",
|
| "sortBy": "publishedAt",
|
| "pageSize": 20
|
| })
|
| elif "cryptopanic" in resource.id:
|
|
|
| continue
|
| elif "cryptocontrol" in resource.id:
|
| url = f"{resource.base_url}/news"
|
|
|
|
|
| logger.debug(f"Fetching from {resource.name}...")
|
| async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
|
| response = await client.get(url, headers=headers, params=params)
|
| response.raise_for_status()
|
|
|
|
|
| content_type = response.headers.get("content-type", "")
|
| if "application/json" not in content_type and "text/json" not in content_type:
|
|
|
| logger.debug(f"Non-JSON response from {resource.name}: {content_type}")
|
| continue
|
|
|
| data = response.json()
|
|
|
|
|
| articles = []
|
| if "newsapi" in resource.id:
|
| articles = data.get("articles", [])
|
| elif "cryptopanic" in resource.id:
|
| articles = data.get("results", [])
|
| else:
|
| articles = data if isinstance(data, list) else data.get("news", [])
|
|
|
|
|
| for article in articles[:10]:
|
| try:
|
| normalized = {
|
| "title": article.get("title", article.get("name", "")),
|
| "description": article.get("description", article.get("summary", "")),
|
| "url": article.get("url", article.get("link", "")),
|
| "published_at": article.get("publishedAt", article.get("published_at", article.get("created_at", ""))),
|
| "source": resource.name,
|
| "source_id": resource.id,
|
| "category": "news",
|
| "fetched_at": datetime.utcnow().isoformat() + "Z"
|
| }
|
| news_data.append(normalized)
|
| except Exception as e:
|
| logger.debug(f"Error parsing article: {e}")
|
| continue
|
|
|
| logger.info(f"โ
{resource.name}: {len(articles[:10])} articles")
|
|
|
| except httpx.HTTPError as e:
|
| logger.debug(f"HTTP error from {resource.name}: {e}")
|
| except Exception as e:
|
| logger.debug(f"Error fetching from {resource.name}: {e}")
|
|
|
| logger.info(f"๐ฐ Total news articles collected: {len(news_data)}")
|
| return news_data
|
|
|
|
|
|
|
|
|
|
|
|
|
| async def fetch_fear_greed_index() -> List[Dict[str, Any]]:
|
| """Fetch Fear & Greed Index from Alternative.me (FREE, no API key)"""
|
| try:
|
| url = "https://api.alternative.me/fng/"
|
| params = {"limit": "1"}
|
|
|
| async with httpx.AsyncClient(timeout=10.0) as client:
|
| response = await client.get(url, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| fng_list = data.get("data", [])
|
| if isinstance(fng_list, list) and len(fng_list) > 0:
|
| fng_data = fng_list[0]
|
| sentiment = {
|
| "metric": "fear_greed_index",
|
| "value": float(fng_data.get("value", 0)),
|
| "classification": fng_data.get("value_classification", ""),
|
| "source": "Alternative.me",
|
| "source_id": "alternative-me-fng",
|
| "timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z",
|
| "fetched_at": datetime.utcnow().isoformat() + "Z"
|
| }
|
| logger.info(f"โ
Fear & Greed Index: {fng_data.get('value')} ({fng_data.get('value_classification')})")
|
| return [sentiment]
|
| except Exception as e:
|
| logger.debug(f"Fear & Greed Index error: {e}")
|
|
|
| return []
|
|
|
|
|
| async def fetch_sentiment_data() -> List[Dict[str, Any]]:
|
| """
|
| Fetch sentiment data from multiple sources
|
|
|
| Sources:
|
| - Alternative.me Fear & Greed Index (FREE, no API key)
|
| - LunarCrush (requires API key)
|
| - Santiment (requires API key)
|
| - And other sentiment sources from registry
|
| """
|
| sentiment_data = []
|
|
|
|
|
| try:
|
| fng_data = await fetch_fear_greed_index()
|
| sentiment_data.extend(fng_data)
|
| except Exception as e:
|
| logger.debug(f"Error fetching Fear & Greed Index: {e}")
|
|
|
| sentiment_resources = resource_loader.get_resources_by_category("sentiment")
|
| logger.info(f"๐ Fetching sentiment from {len(sentiment_resources)} additional sources...")
|
|
|
| for resource in sentiment_resources:
|
| try:
|
|
|
| if resource.auth_type != "none" and not resource.api_key:
|
| logger.debug(f"Skipping {resource.name} (no API key)")
|
| continue
|
|
|
| url = resource.base_url
|
| headers = {}
|
| params = {}
|
|
|
|
|
| if resource.auth_type == "apiKeyHeader" and resource.api_key:
|
| headers["Authorization"] = f"Bearer {resource.api_key}"
|
| elif resource.auth_type == "apiKeyQuery" and resource.api_key:
|
| params["api_key"] = resource.api_key
|
|
|
|
|
| if "alternative.me" in resource.id or "alternative-me" in resource.id:
|
|
|
| continue
|
| elif "lunarcrush" in resource.id:
|
| url = f"{resource.base_url}/assets"
|
| params.update({"symbol": "BTC,ETH,BNB", "data_points": 1})
|
|
|
|
|
| logger.debug(f"Fetching from {resource.name}...")
|
| async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
|
| response = await client.get(url, headers=headers, params=params)
|
| response.raise_for_status()
|
|
|
|
|
| content_type = response.headers.get("content-type", "")
|
| if "application/json" not in content_type and "text/json" not in content_type:
|
| logger.debug(f"Non-JSON response from {resource.name}: {content_type}")
|
| continue
|
|
|
| data = response.json()
|
|
|
|
|
| if "alternative.me" in resource.id or "alternative-me" in resource.id:
|
| fng_list = data.get("data", [])
|
| if isinstance(fng_list, list) and len(fng_list) > 0:
|
| fng_data = fng_list[0]
|
| sentiment_data.append({
|
| "metric": "fear_greed_index",
|
| "value": float(fng_data.get("value", 0)),
|
| "classification": fng_data.get("value_classification", ""),
|
| "source": resource.name,
|
| "source_id": resource.id,
|
| "timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z",
|
| "fetched_at": datetime.utcnow().isoformat() + "Z"
|
| })
|
| logger.info(f"โ
{resource.name}: FNG = {fng_data.get('value')} ({fng_data.get('value_classification')})")
|
|
|
| elif "lunarcrush" in resource.id:
|
| assets = data.get("data", [])
|
| for asset in assets:
|
| sentiment_data.append({
|
| "symbol": asset.get("symbol", ""),
|
| "metric": "galaxy_score",
|
| "value": float(asset.get("galaxy_score", 0)),
|
| "alt_rank": asset.get("alt_rank"),
|
| "social_volume": asset.get("social_volume"),
|
| "source": resource.name,
|
| "source_id": resource.id,
|
| "timestamp": datetime.utcnow().isoformat() + "Z",
|
| "fetched_at": datetime.utcnow().isoformat() + "Z"
|
| })
|
| logger.info(f"โ
{resource.name}: {len(assets)} assets")
|
|
|
| except httpx.HTTPError as e:
|
| logger.debug(f"HTTP error from {resource.name}: {e}")
|
| except Exception as e:
|
| logger.debug(f"Error fetching from {resource.name}: {e}")
|
|
|
| logger.info(f"๐ Total sentiment data collected: {len(sentiment_data)}")
|
| return sentiment_data
|
|
|
|
|
|
|
|
|
|
|
|
|
| async def fetch_onchain_data() -> List[Dict[str, Any]]:
|
| """
|
| Fetch on-chain analytics from ALL on-chain APIs
|
|
|
| Sources:
|
| - Glassnode
|
| - IntoTheBlock
|
| - CryptoQuant
|
| - And all other on-chain sources (13 total)
|
| """
|
| onchain_data = []
|
| onchain_resources = resource_loader.get_resources_by_category("onchain_analytics")
|
|
|
| logger.info(f"โ๏ธ Fetching on-chain data from {len(onchain_resources)} sources...")
|
|
|
| for resource in onchain_resources:
|
| try:
|
|
|
| if resource.auth_type != "none" and not resource.api_key:
|
| logger.debug(f"Skipping {resource.name} (no API key)")
|
| continue
|
|
|
|
|
| url = resource.base_url
|
| headers = {}
|
| params = {}
|
|
|
| if resource.auth_type == "apiKeyQuery" and resource.api_key:
|
| params["api_key"] = resource.api_key
|
| elif resource.auth_type == "apiKeyHeader" and resource.api_key:
|
| headers["Authorization"] = f"Bearer {resource.api_key}"
|
|
|
|
|
| logger.debug(f"Attempting {resource.name}...")
|
| async with httpx.AsyncClient(timeout=10.0) as client:
|
| response = await client.get(url, headers=headers, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
|
|
| onchain_data.append({
|
| "source": resource.name,
|
| "source_id": resource.id,
|
| "data": data,
|
| "fetched_at": datetime.utcnow().isoformat() + "Z"
|
| })
|
| logger.info(f"โ
{resource.name}: Data received")
|
|
|
| except httpx.HTTPError as e:
|
| logger.debug(f"HTTP error from {resource.name}: {e}")
|
| except Exception as e:
|
| logger.debug(f"Error from {resource.name}: {e}")
|
|
|
| logger.info(f"โ๏ธ Total on-chain data points: {len(onchain_data)}")
|
| return onchain_data
|
|
|
|
|
|
|
|
|
|
|
|
|
| async def fetch_whale_data() -> List[Dict[str, Any]]:
|
| """
|
| Fetch whale transactions from ALL whale tracking APIs
|
|
|
| Sources:
|
| - Whale Alert
|
| - Whale Watcher
|
| - And all other whale tracking sources (9 total)
|
| """
|
| whale_data = []
|
| whale_resources = resource_loader.get_resources_by_category("whale_tracking")
|
|
|
| logger.info(f"๐ Fetching whale data from {len(whale_resources)} sources...")
|
|
|
| for resource in whale_resources:
|
| try:
|
| if resource.auth_type != "none" and not resource.api_key:
|
| logger.debug(f"Skipping {resource.name} (no API key)")
|
| continue
|
|
|
| url = resource.base_url
|
| headers = {}
|
| params = {}
|
|
|
| if resource.auth_type == "apiKeyQuery" and resource.api_key:
|
| params["api_key"] = resource.api_key
|
| elif resource.auth_type == "apiKeyHeader" and resource.api_key:
|
| headers["X-API-Key"] = resource.api_key
|
|
|
|
|
| if "whale-alert" in resource.id and resource.endpoints:
|
| url = f"{resource.base_url}/transactions"
|
| params["min_value"] = 500000
|
|
|
| logger.debug(f"Fetching from {resource.name}...")
|
| async with httpx.AsyncClient(timeout=10.0) as client:
|
| response = await client.get(url, headers=headers, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| transactions = data.get("transactions", []) if isinstance(data, dict) else data
|
|
|
| for tx in transactions[:20]:
|
| whale_data.append({
|
| "source": resource.name,
|
| "source_id": resource.id,
|
| "transaction": tx,
|
| "fetched_at": datetime.utcnow().isoformat() + "Z"
|
| })
|
|
|
| logger.info(f"โ
{resource.name}: {len(transactions[:20])} transactions")
|
|
|
| except httpx.HTTPError as e:
|
| logger.debug(f"HTTP error from {resource.name}: {e}")
|
| except Exception as e:
|
| logger.debug(f"Error from {resource.name}: {e}")
|
|
|
| logger.info(f"๐ Total whale transactions: {len(whale_data)}")
|
| return whale_data
|
|
|
|
|
|
|
|
|
|
|
|
|
| async def fetch_block_explorer_data() -> List[Dict[str, Any]]:
|
| """
|
| Fetch blockchain data from ALL block explorers
|
|
|
| Sources:
|
| - Etherscan
|
| - BscScan
|
| - Polygonscan
|
| - And all other block explorers (18 total)
|
| """
|
| explorer_data = []
|
| explorer_resources = resource_loader.get_resources_by_category("block_explorers")
|
|
|
| logger.info(f"๐ Fetching from {len(explorer_resources)} block explorers...")
|
|
|
| for resource in explorer_resources:
|
| try:
|
| if resource.auth_type != "none" and not resource.api_key:
|
| logger.debug(f"Skipping {resource.name} (no API key)")
|
| continue
|
|
|
| url = f"{resource.base_url}/api"
|
| params = {
|
| "module": "stats",
|
| "action": "ethprice",
|
| }
|
|
|
| if resource.api_key:
|
| params["apikey"] = resource.api_key
|
|
|
| logger.debug(f"Fetching from {resource.name}...")
|
| async with httpx.AsyncClient(timeout=10.0) as client:
|
| response = await client.get(url, params=params)
|
| response.raise_for_status()
|
| data = response.json()
|
|
|
| if data.get("status") == "1":
|
| result = data.get("result", {})
|
| explorer_data.append({
|
| "chain": resource.chain if hasattr(resource, 'chain') else "unknown",
|
| "source": resource.name,
|
| "source_id": resource.id,
|
| "price_usd": result.get("ethusd"),
|
| "price_btc": result.get("ethbtc"),
|
| "fetched_at": datetime.utcnow().isoformat() + "Z"
|
| })
|
| logger.info(f"โ
{resource.name}: Price data received")
|
|
|
| except httpx.HTTPError as e:
|
| logger.debug(f"HTTP error from {resource.name}: {e}")
|
| except Exception as e:
|
| logger.debug(f"Error from {resource.name}: {e}")
|
|
|
| logger.info(f"๐ Total block explorer data: {len(explorer_data)}")
|
| return explorer_data
|
|
|
|
|
|
|
|
|
|
|
|
|
| async def save_and_upload_news(news_data: List[Dict[str, Any]]) -> bool:
|
| """Save news data and upload to HuggingFace"""
|
| if not news_data:
|
| return False
|
|
|
| logger.info(f"๐พ Saving {len(news_data)} news articles...")
|
|
|
|
|
| if HF_UPLOAD_ENABLED and hf_uploader:
|
| try:
|
| logger.info(f"๐ค Uploading {len(news_data)} news articles to HuggingFace...")
|
| success = await hf_uploader.upload_news_data(news_data, append=True)
|
|
|
| if success:
|
| logger.info(f"โ
Successfully uploaded news to HuggingFace")
|
| return True
|
| else:
|
| logger.warning(f"โ ๏ธ Failed to upload news to HuggingFace")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"Error uploading news to HuggingFace: {e}")
|
| return False
|
|
|
| return True
|
|
|
|
|
| async def save_and_upload_sentiment(sentiment_data: List[Dict[str, Any]]) -> bool:
|
| """Save sentiment data and upload to HuggingFace"""
|
| if not sentiment_data:
|
| return False
|
|
|
| logger.info(f"๐พ Saving {len(sentiment_data)} sentiment records...")
|
|
|
| if HF_UPLOAD_ENABLED and hf_uploader:
|
| try:
|
| logger.info(f"๐ค Uploading {len(sentiment_data)} sentiment records to HuggingFace...")
|
| success = await hf_uploader.upload_sentiment_data(sentiment_data, append=True)
|
|
|
| if success:
|
| logger.info(f"โ
Successfully uploaded sentiment to HuggingFace")
|
| return True
|
| else:
|
| logger.warning(f"โ ๏ธ Failed to upload sentiment to HuggingFace")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"Error uploading sentiment: {e}")
|
| return False
|
|
|
| return True
|
|
|
|
|
| async def save_and_upload_onchain(onchain_data: List[Dict[str, Any]]) -> bool:
|
| """Save on-chain data and upload to HuggingFace"""
|
| if not onchain_data:
|
| return False
|
|
|
| logger.info(f"๐พ Saving {len(onchain_data)} on-chain records...")
|
|
|
| if HF_UPLOAD_ENABLED and hf_uploader:
|
| try:
|
| logger.info(f"๐ค Uploading {len(onchain_data)} on-chain records to HuggingFace...")
|
| success = await hf_uploader.upload_onchain_data(onchain_data, append=True)
|
|
|
| if success:
|
| logger.info(f"โ
Successfully uploaded on-chain data to HuggingFace")
|
| return True
|
| else:
|
| logger.warning(f"โ ๏ธ Failed to upload on-chain data to HuggingFace")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"Error uploading on-chain data: {e}")
|
| return False
|
|
|
| return True
|
|
|
|
|
| async def save_and_upload_whale(whale_data: List[Dict[str, Any]]) -> bool:
|
| """Save whale data and upload to HuggingFace"""
|
| if not whale_data:
|
| return False
|
|
|
| logger.info(f"๐พ Saving {len(whale_data)} whale records...")
|
|
|
| if HF_UPLOAD_ENABLED and hf_uploader:
|
| try:
|
| logger.info(f"๐ค Uploading {len(whale_data)} whale records to HuggingFace...")
|
| success = await hf_uploader.upload_whale_data(whale_data, append=True)
|
|
|
| if success:
|
| logger.info(f"โ
Successfully uploaded whale data to HuggingFace")
|
| return True
|
| else:
|
| logger.warning(f"โ ๏ธ Failed to upload whale data to HuggingFace")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"Error uploading whale data: {e}")
|
| return False
|
|
|
| return True
|
|
|
|
|
| async def save_and_upload_explorer(explorer_data: List[Dict[str, Any]]) -> bool:
|
| """Save explorer data and upload to HuggingFace"""
|
| if not explorer_data:
|
| return False
|
|
|
| logger.info(f"๐พ Saving {len(explorer_data)} explorer records...")
|
|
|
| if HF_UPLOAD_ENABLED and hf_uploader:
|
| try:
|
| logger.info(f"๐ค Uploading {len(explorer_data)} explorer records to HuggingFace...")
|
| success = await hf_uploader.upload_explorer_data(explorer_data, append=True)
|
|
|
| if success:
|
| logger.info(f"โ
Successfully uploaded explorer data to HuggingFace")
|
| return True
|
| else:
|
| logger.warning(f"โ ๏ธ Failed to upload explorer data to HuggingFace")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"Error uploading explorer data: {e}")
|
| return False
|
|
|
| return True
|
|
|
|
|
|
|
|
|
|
|
|
|
| async def comprehensive_worker_loop():
|
| """
|
| Main worker loop - Fetch ALL data from ALL sources
|
|
|
| Runs every 5 minutes to avoid rate limits
|
| """
|
| logger.info("๐ Starting comprehensive data worker")
|
| logger.info(f"๐ Resource statistics: {resource_loader.get_stats()}")
|
|
|
| iteration = 0
|
|
|
| while True:
|
| try:
|
| iteration += 1
|
| start_time = time.time()
|
|
|
| logger.info(f"\n{'='*80}")
|
| logger.info(f"[Iteration {iteration}] Starting comprehensive data collection")
|
| logger.info(f"{'='*80}")
|
|
|
|
|
| results = await asyncio.gather(
|
| fetch_news_data(),
|
| fetch_sentiment_data(),
|
| fetch_onchain_data(),
|
| fetch_whale_data(),
|
| fetch_block_explorer_data(),
|
| return_exceptions=True
|
| )
|
|
|
| news_data, sentiment_data, onchain_data, whale_data, explorer_data = results
|
|
|
|
|
| await asyncio.gather(
|
| save_and_upload_news(news_data if not isinstance(news_data, Exception) else []),
|
| save_and_upload_sentiment(sentiment_data if not isinstance(sentiment_data, Exception) else []),
|
| save_and_upload_onchain(onchain_data if not isinstance(onchain_data, Exception) else []),
|
| save_and_upload_whale(whale_data if not isinstance(whale_data, Exception) else []),
|
| save_and_upload_explorer(explorer_data if not isinstance(explorer_data, Exception) else []),
|
| return_exceptions=True
|
| )
|
|
|
| elapsed = time.time() - start_time
|
| total_records = sum([
|
| len(news_data) if not isinstance(news_data, Exception) else 0,
|
| len(sentiment_data) if not isinstance(sentiment_data, Exception) else 0,
|
| len(onchain_data) if not isinstance(onchain_data, Exception) else 0,
|
| len(whale_data) if not isinstance(whale_data, Exception) else 0,
|
| len(explorer_data) if not isinstance(explorer_data, Exception) else 0,
|
| ])
|
|
|
| logger.info(f"\n{'='*80}")
|
| logger.info(f"[Iteration {iteration}] Completed in {elapsed:.2f}s")
|
| logger.info(f"Total records collected: {total_records}")
|
| logger.info(f"{'='*80}\n")
|
|
|
|
|
| await asyncio.sleep(300)
|
|
|
| except Exception as e:
|
| logger.error(f"[Iteration {iteration}] Worker error: {e}", exc_info=True)
|
| await asyncio.sleep(300)
|
|
|
|
|
| async def start_comprehensive_worker():
|
| """Start comprehensive data worker"""
|
| try:
|
| logger.info("Initializing comprehensive data worker...")
|
|
|
|
|
| logger.info("Running initial data fetch...")
|
| asyncio.create_task(comprehensive_worker_loop())
|
| logger.info("Comprehensive data worker started successfully")
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to start comprehensive worker: {e}", exc_info=True)
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
| async def test():
|
| """Test the worker"""
|
| logger.info("Testing comprehensive data worker...")
|
|
|
|
|
| news = await fetch_news_data()
|
| logger.info(f"\nโ
News: {len(news)} articles")
|
|
|
| sentiment = await fetch_sentiment_data()
|
| logger.info(f"โ
Sentiment: {len(sentiment)} records")
|
|
|
| onchain = await fetch_onchain_data()
|
| logger.info(f"โ
On-chain: {len(onchain)} records")
|
|
|
| whale = await fetch_whale_data()
|
| logger.info(f"โ
Whale: {len(whale)} transactions")
|
|
|
| explorer = await fetch_block_explorer_data()
|
| logger.info(f"โ
Explorer: {len(explorer)} records")
|
|
|
| asyncio.run(test())
|
|
|