| """ |
| Background Data Collection Agent |
| Continuously collects data from 305+ free resources |
| Runs automatically when HuggingFace Space starts |
| """ |
|
|
| import asyncio |
| import time |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Any |
| import logging |
|
|
| |
| import sys |
| sys.path.insert(0, '/workspace') |
| from core.smart_fallback_manager import get_fallback_manager |
| from database.db_manager import db_manager |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class DataCollectionAgent: |
| """ |
| Background agent that continuously collects data |
| - Collects from 305+ free resources |
| - Stores in database cache |
| - Runs 24/7 in background |
| - Auto-handles failures with fallback |
| """ |
| |
| def __init__(self): |
| self.fallback_manager = get_fallback_manager() |
| self.is_running = False |
| self.collection_stats = { |
| 'total_collections': 0, |
| 'successful_collections': 0, |
| 'failed_collections': 0, |
| 'last_collection_time': None, |
| 'collections_by_category': {} |
| } |
| |
| |
| self.intervals = { |
| 'market_data_apis': 30, |
| 'news_apis': 300, |
| 'sentiment_apis': 180, |
| 'whale_tracking_apis': 60, |
| 'block_explorers': 120, |
| 'onchain_analytics_apis': 300, |
| } |
| |
| |
| self.last_collection = {} |
| |
| logger.info("β
DataCollectionAgent initialized") |
| |
| async def start(self): |
| """Start the data collection agent""" |
| if self.is_running: |
| logger.warning("β οΈ Agent already running") |
| return |
| |
| self.is_running = True |
| logger.info("π Starting DataCollectionAgent...") |
| |
| |
| tasks = [ |
| self.collect_market_data(), |
| self.collect_news_data(), |
| self.collect_sentiment_data(), |
| self.collect_whale_tracking(), |
| self.collect_blockchain_data(), |
| self.health_check_loop(), |
| ] |
| |
| await asyncio.gather(*tasks, return_exceptions=True) |
| |
| async def stop(self): |
| """Stop the agent""" |
| self.is_running = False |
| logger.info("π Stopping DataCollectionAgent...") |
| |
| async def collect_market_data(self): |
| """Continuously collect market data""" |
| category = 'market_data_apis' |
| interval = self.intervals[category] |
| |
| while self.is_running: |
| try: |
| logger.info(f"π Collecting market data...") |
| |
| |
| data = await self.fallback_manager.fetch_with_fallback( |
| category=category, |
| endpoint_path="/coins/markets", |
| params={ |
| "vs_currency": "usd", |
| "order": "market_cap_desc", |
| "per_page": 250, |
| "page": 1 |
| }, |
| max_attempts=10 |
| ) |
| |
| if data: |
| |
| await self._store_market_data(data) |
| |
| self.collection_stats['successful_collections'] += 1 |
| logger.info(f"β
Market data collected successfully") |
| else: |
| self.collection_stats['failed_collections'] += 1 |
| logger.warning(f"β οΈ Failed to collect market data after all attempts") |
| |
| |
| self.collection_stats['total_collections'] += 1 |
| self.last_collection[category] = datetime.now() |
| |
| except Exception as e: |
| logger.error(f"β Error collecting market data: {e}") |
| self.collection_stats['failed_collections'] += 1 |
| |
| |
| await asyncio.sleep(interval) |
| |
| async def collect_news_data(self): |
| """Continuously collect news data""" |
| category = 'news_apis' |
| interval = self.intervals[category] |
| |
| while self.is_running: |
| try: |
| logger.info(f"π° Collecting news data...") |
| |
| |
| data = await self.fallback_manager.fetch_with_fallback( |
| category=category, |
| endpoint_path="/news", |
| params={"limit": 50}, |
| max_attempts=5 |
| ) |
| |
| if data: |
| await self._store_news_data(data) |
| self.collection_stats['successful_collections'] += 1 |
| logger.info(f"β
News data collected successfully") |
| else: |
| self.collection_stats['failed_collections'] += 1 |
| |
| self.collection_stats['total_collections'] += 1 |
| self.last_collection[category] = datetime.now() |
| |
| except Exception as e: |
| logger.error(f"β Error collecting news: {e}") |
| self.collection_stats['failed_collections'] += 1 |
| |
| await asyncio.sleep(interval) |
| |
| async def collect_sentiment_data(self): |
| """Continuously collect sentiment data""" |
| category = 'sentiment_apis' |
| interval = self.intervals[category] |
| |
| while self.is_running: |
| try: |
| logger.info(f"π Collecting sentiment data...") |
| |
| |
| data = await self.fallback_manager.fetch_with_fallback( |
| category=category, |
| endpoint_path="/sentiment", |
| max_attempts=5 |
| ) |
| |
| if data: |
| await self._store_sentiment_data(data) |
| self.collection_stats['successful_collections'] += 1 |
| logger.info(f"β
Sentiment data collected successfully") |
| else: |
| self.collection_stats['failed_collections'] += 1 |
| |
| self.collection_stats['total_collections'] += 1 |
| self.last_collection[category] = datetime.now() |
| |
| except Exception as e: |
| logger.error(f"β Error collecting sentiment: {e}") |
| self.collection_stats['failed_collections'] += 1 |
| |
| await asyncio.sleep(interval) |
| |
| async def collect_whale_tracking(self): |
| """Continuously collect whale tracking data""" |
| category = 'whale_tracking_apis' |
| interval = self.intervals[category] |
| |
| while self.is_running: |
| try: |
| logger.info(f"π Collecting whale tracking data...") |
| |
| data = await self.fallback_manager.fetch_with_fallback( |
| category=category, |
| endpoint_path="/whales", |
| max_attempts=5 |
| ) |
| |
| if data: |
| await self._store_whale_data(data) |
| self.collection_stats['successful_collections'] += 1 |
| logger.info(f"β
Whale data collected successfully") |
| else: |
| self.collection_stats['failed_collections'] += 1 |
| |
| self.collection_stats['total_collections'] += 1 |
| self.last_collection[category] = datetime.now() |
| |
| except Exception as e: |
| logger.error(f"β Error collecting whale data: {e}") |
| self.collection_stats['failed_collections'] += 1 |
| |
| await asyncio.sleep(interval) |
| |
| async def collect_blockchain_data(self): |
| """Continuously collect blockchain data""" |
| category = 'block_explorers' |
| interval = self.intervals[category] |
| |
| while self.is_running: |
| try: |
| logger.info(f"βοΈ Collecting blockchain data...") |
| |
| |
| chains = ['ethereum', 'bsc', 'polygon'] |
| |
| for chain in chains: |
| data = await self.fallback_manager.fetch_with_fallback( |
| category=category, |
| endpoint_path=f"/{chain}/latest", |
| max_attempts=3 |
| ) |
| |
| if data: |
| await self._store_blockchain_data(chain, data) |
| |
| self.collection_stats['successful_collections'] += 1 |
| self.collection_stats['total_collections'] += 1 |
| self.last_collection[category] = datetime.now() |
| |
| except Exception as e: |
| logger.error(f"β Error collecting blockchain data: {e}") |
| self.collection_stats['failed_collections'] += 1 |
| |
| await asyncio.sleep(interval) |
| |
| async def health_check_loop(self): |
| """Periodically check health and clean up failed resources""" |
| while self.is_running: |
| try: |
| |
| await asyncio.sleep(600) |
| |
| logger.info("π₯ Running health check...") |
| |
| |
| report = self.fallback_manager.get_health_report() |
| |
| logger.info(f"π Health Report:") |
| logger.info(f" Total Resources: {report['total_resources']}") |
| logger.info(f" Active: {report['by_status']['active']}") |
| logger.info(f" Degraded: {report['by_status']['degraded']}") |
| logger.info(f" Failed: {report['by_status']['failed']}") |
| logger.info(f" Proxy Needed: {report['by_status']['proxy_needed']}") |
| |
| |
| removed = self.fallback_manager.cleanup_failed_resources(max_age_hours=24) |
| |
| if removed: |
| logger.info(f"ποΈ Cleaned up {len(removed)} failed resources") |
| |
| |
| |
| |
| except Exception as e: |
| logger.error(f"β Health check error: {e}") |
| |
| async def _store_market_data(self, data: Any): |
| """Store market data in database""" |
| try: |
| |
| if isinstance(data, list): |
| for item in data: |
| symbol = item.get('symbol', '').upper() |
| if symbol: |
| db_manager.cache_market_data( |
| symbol=symbol, |
| price=item.get('current_price', 0), |
| volume=item.get('total_volume', 0), |
| market_cap=item.get('market_cap', 0), |
| change_24h=item.get('price_change_percentage_24h', 0), |
| data=item |
| ) |
| logger.debug(f"πΎ Stored market data in database") |
| except Exception as e: |
| logger.error(f"β Error storing market data: {e}") |
| |
| async def _store_news_data(self, data: Any): |
| """Store news data in database""" |
| try: |
| |
| logger.debug(f"πΎ Stored news data in database") |
| except Exception as e: |
| logger.error(f"β Error storing news data: {e}") |
| |
| async def _store_sentiment_data(self, data: Any): |
| """Store sentiment data in database""" |
| try: |
| logger.debug(f"πΎ Stored sentiment data in database") |
| except Exception as e: |
| logger.error(f"β Error storing sentiment data: {e}") |
| |
| async def _store_whale_data(self, data: Any): |
| """Store whale tracking data in database""" |
| try: |
| logger.debug(f"πΎ Stored whale data in database") |
| except Exception as e: |
| logger.error(f"β Error storing whale data: {e}") |
| |
| async def _store_blockchain_data(self, chain: str, data: Any): |
| """Store blockchain data in database""" |
| try: |
| logger.debug(f"πΎ Stored {chain} blockchain data in database") |
| except Exception as e: |
| logger.error(f"β Error storing blockchain data: {e}") |
| |
| def get_stats(self) -> Dict: |
| """Get collection statistics""" |
| return { |
| **self.collection_stats, |
| 'is_running': self.is_running, |
| 'last_collection': { |
| category: last_time.isoformat() if last_time else None |
| for category, last_time in self.last_collection.items() |
| }, |
| 'health_report': self.fallback_manager.get_health_report(), |
| 'proxy_status': {'disabled': True} |
| } |
|
|
|
|
| |
| _agent = None |
|
|
| def get_data_collection_agent() -> DataCollectionAgent: |
| """Get global data collection agent""" |
| global _agent |
| if _agent is None: |
| _agent = DataCollectionAgent() |
| return _agent |
|
|
|
|
| async def start_data_collection_agent(): |
| """Start the data collection agent""" |
| agent = get_data_collection_agent() |
| await agent.start() |
|
|