| """ |
| Database Migration System |
| Handles schema versioning and migrations for SQLite database |
| """ |
|
|
| import sqlite3 |
| import logging |
| from typing import List, Callable, Tuple |
| from datetime import datetime |
| from pathlib import Path |
| import traceback |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class Migration: |
| """Represents a single database migration""" |
|
|
| def __init__( |
| self, |
| version: int, |
| description: str, |
| up_sql: str, |
| down_sql: str = "" |
| ): |
| """ |
| Initialize migration |
| |
| Args: |
| version: Migration version number (sequential) |
| description: Human-readable description |
| up_sql: SQL to apply migration |
| down_sql: SQL to rollback migration |
| """ |
| self.version = version |
| self.description = description |
| self.up_sql = up_sql |
| self.down_sql = down_sql |
|
|
|
|
| class MigrationManager: |
| """ |
| Manages database schema migrations |
| Tracks applied migrations and handles upgrades/downgrades |
| """ |
|
|
| def __init__(self, db_path: str): |
| """ |
| Initialize migration manager |
| |
| Args: |
| db_path: Path to SQLite database file |
| """ |
| self.db_path = db_path |
| self.migrations: List[Migration] = [] |
| self._init_migrations_table() |
| self._register_migrations() |
|
|
| def _init_migrations_table(self): |
| """Create migrations tracking table if not exists""" |
| try: |
| conn = sqlite3.connect(self.db_path) |
| cursor = conn.cursor() |
|
|
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS schema_migrations ( |
| version INTEGER PRIMARY KEY, |
| description TEXT NOT NULL, |
| applied_at TIMESTAMP NOT NULL, |
| execution_time_ms INTEGER |
| ) |
| """) |
|
|
| conn.commit() |
| conn.close() |
|
|
| logger.info("Migrations table initialized") |
|
|
| except Exception as e: |
| logger.error(f"Failed to initialize migrations table: {e}") |
| raise |
|
|
| def _register_migrations(self): |
| """Register all migrations in order""" |
|
|
| |
| self.migrations.append(Migration( |
| version=1, |
| description="Add whale tracking table", |
| up_sql=""" |
| CREATE TABLE IF NOT EXISTS whale_transactions ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| transaction_hash TEXT UNIQUE NOT NULL, |
| blockchain TEXT NOT NULL, |
| from_address TEXT NOT NULL, |
| to_address TEXT NOT NULL, |
| amount REAL NOT NULL, |
| token_symbol TEXT, |
| usd_value REAL, |
| timestamp TIMESTAMP NOT NULL, |
| detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ); |
| |
| CREATE INDEX IF NOT EXISTS idx_whale_timestamp |
| ON whale_transactions(timestamp); |
| |
| CREATE INDEX IF NOT EXISTS idx_whale_blockchain |
| ON whale_transactions(blockchain); |
| """, |
| down_sql="DROP TABLE IF EXISTS whale_transactions;" |
| )) |
|
|
| |
| self.migrations.append(Migration( |
| version=2, |
| description="Add performance indices", |
| up_sql=""" |
| CREATE INDEX IF NOT EXISTS idx_prices_symbol_timestamp |
| ON prices(symbol, timestamp); |
| |
| CREATE INDEX IF NOT EXISTS idx_news_published_date |
| ON news(published_date DESC); |
| |
| CREATE INDEX IF NOT EXISTS idx_analysis_symbol_timestamp |
| ON market_analysis(symbol, timestamp DESC); |
| """, |
| down_sql=""" |
| DROP INDEX IF EXISTS idx_prices_symbol_timestamp; |
| DROP INDEX IF EXISTS idx_news_published_date; |
| DROP INDEX IF EXISTS idx_analysis_symbol_timestamp; |
| """ |
| )) |
|
|
| |
| self.migrations.append(Migration( |
| version=3, |
| description="Add API key tracking table", |
| up_sql=""" |
| CREATE TABLE IF NOT EXISTS api_key_usage ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| api_key_hash TEXT NOT NULL, |
| endpoint TEXT NOT NULL, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| response_time_ms INTEGER, |
| status_code INTEGER, |
| ip_address TEXT |
| ); |
| |
| CREATE INDEX IF NOT EXISTS idx_api_usage_timestamp |
| ON api_key_usage(timestamp); |
| |
| CREATE INDEX IF NOT EXISTS idx_api_usage_key |
| ON api_key_usage(api_key_hash); |
| """, |
| down_sql="DROP TABLE IF EXISTS api_key_usage;" |
| )) |
|
|
| |
| self.migrations.append(Migration( |
| version=4, |
| description="Enhance user queries table with metadata", |
| up_sql=""" |
| CREATE TABLE IF NOT EXISTS user_queries_v2 ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| query TEXT NOT NULL, |
| query_type TEXT, |
| result_count INTEGER, |
| execution_time_ms INTEGER, |
| user_id TEXT, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ); |
| |
| -- Migrate old data if exists |
| INSERT INTO user_queries_v2 (query, result_count, timestamp) |
| SELECT query, result_count, timestamp |
| FROM user_queries |
| WHERE EXISTS (SELECT 1 FROM sqlite_master WHERE type='table' AND name='user_queries'); |
| |
| DROP TABLE IF EXISTS user_queries; |
| |
| ALTER TABLE user_queries_v2 RENAME TO user_queries; |
| |
| CREATE INDEX IF NOT EXISTS idx_user_queries_timestamp |
| ON user_queries(timestamp); |
| """, |
| down_sql="-- Cannot rollback data migration" |
| )) |
|
|
| |
| self.migrations.append(Migration( |
| version=5, |
| description="Add cache metadata table", |
| up_sql=""" |
| CREATE TABLE IF NOT EXISTS cache_metadata ( |
| cache_key TEXT PRIMARY KEY, |
| data_type TEXT NOT NULL, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| expires_at TIMESTAMP NOT NULL, |
| hit_count INTEGER DEFAULT 0, |
| size_bytes INTEGER |
| ); |
| |
| CREATE INDEX IF NOT EXISTS idx_cache_expires |
| ON cache_metadata(expires_at); |
| """, |
| down_sql="DROP TABLE IF EXISTS cache_metadata;" |
| )) |
|
|
| logger.info(f"Registered {len(self.migrations)} migrations") |
|
|
| def get_current_version(self) -> int: |
| """ |
| Get current database schema version |
| |
| Returns: |
| Current version number (0 if no migrations applied) |
| """ |
| try: |
| conn = sqlite3.connect(self.db_path) |
| cursor = conn.cursor() |
|
|
| cursor.execute( |
| "SELECT MAX(version) FROM schema_migrations" |
| ) |
| result = cursor.fetchone() |
|
|
| conn.close() |
|
|
| return result[0] if result[0] is not None else 0 |
|
|
| except Exception as e: |
| logger.error(f"Failed to get current version: {e}") |
| return 0 |
|
|
| def get_pending_migrations(self) -> List[Migration]: |
| """ |
| Get list of pending migrations |
| |
| Returns: |
| List of migrations not yet applied |
| """ |
| current_version = self.get_current_version() |
|
|
| return [ |
| migration for migration in self.migrations |
| if migration.version > current_version |
| ] |
|
|
| def apply_migration(self, migration: Migration) -> bool: |
| """ |
| Apply a single migration |
| |
| Args: |
| migration: Migration to apply |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| try: |
| start_time = datetime.now() |
|
|
| conn = sqlite3.connect(self.db_path) |
| cursor = conn.cursor() |
|
|
| |
| cursor.executescript(migration.up_sql) |
|
|
| |
| execution_time = int((datetime.now() - start_time).total_seconds() * 1000) |
|
|
| cursor.execute( |
| """ |
| INSERT INTO schema_migrations |
| (version, description, applied_at, execution_time_ms) |
| VALUES (?, ?, ?, ?) |
| """, |
| ( |
| migration.version, |
| migration.description, |
| datetime.now(), |
| execution_time |
| ) |
| ) |
|
|
| conn.commit() |
| conn.close() |
|
|
| logger.info( |
| f"Applied migration {migration.version}: {migration.description} " |
| f"({execution_time}ms)" |
| ) |
|
|
| return True |
|
|
| except Exception as e: |
| logger.error( |
| f"Failed to apply migration {migration.version}: {e}\n" |
| f"{traceback.format_exc()}" |
| ) |
| return False |
|
|
| def migrate_to_latest(self) -> Tuple[bool, List[int]]: |
| """ |
| Apply all pending migrations |
| |
| Returns: |
| Tuple of (success: bool, applied_versions: List[int]) |
| """ |
| pending = self.get_pending_migrations() |
|
|
| if not pending: |
| logger.info("No pending migrations") |
| return True, [] |
|
|
| logger.info(f"Applying {len(pending)} pending migrations...") |
|
|
| applied = [] |
| for migration in pending: |
| if self.apply_migration(migration): |
| applied.append(migration.version) |
| else: |
| logger.error(f"Migration failed at version {migration.version}") |
| return False, applied |
|
|
| logger.info(f"Successfully applied {len(applied)} migrations") |
| return True, applied |
|
|
| def rollback_migration(self, version: int) -> bool: |
| """ |
| Rollback a specific migration |
| |
| Args: |
| version: Migration version to rollback |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| migration = next( |
| (m for m in self.migrations if m.version == version), |
| None |
| ) |
|
|
| if not migration: |
| logger.error(f"Migration {version} not found") |
| return False |
|
|
| if not migration.down_sql: |
| logger.error(f"Migration {version} has no rollback SQL") |
| return False |
|
|
| try: |
| conn = sqlite3.connect(self.db_path) |
| cursor = conn.cursor() |
|
|
| |
| cursor.executescript(migration.down_sql) |
|
|
| |
| cursor.execute( |
| "DELETE FROM schema_migrations WHERE version = ?", |
| (version,) |
| ) |
|
|
| conn.commit() |
| conn.close() |
|
|
| logger.info(f"Rolled back migration {version}") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Failed to rollback migration {version}: {e}") |
| return False |
|
|
| def get_migration_history(self) -> List[Tuple[int, str, str]]: |
| """ |
| Get migration history |
| |
| Returns: |
| List of (version, description, applied_at) tuples |
| """ |
| try: |
| conn = sqlite3.connect(self.db_path) |
| cursor = conn.cursor() |
|
|
| cursor.execute(""" |
| SELECT version, description, applied_at |
| FROM schema_migrations |
| ORDER BY version |
| """) |
|
|
| history = cursor.fetchall() |
| conn.close() |
|
|
| return history |
|
|
| except Exception as e: |
| logger.error(f"Failed to get migration history: {e}") |
| return [] |
|
|
|
|
| |
|
|
|
|
| def auto_migrate(db_path: str) -> bool: |
| """ |
| Automatically apply all pending migrations on startup |
| |
| Args: |
| db_path: Path to database file |
| |
| Returns: |
| True if all migrations applied successfully |
| """ |
| try: |
| manager = MigrationManager(db_path) |
| current = manager.get_current_version() |
| logger.info(f"Current schema version: {current}") |
|
|
| success, applied = manager.migrate_to_latest() |
|
|
| if success and applied: |
| logger.info(f"Database migrated to version {max(applied)}") |
| elif success: |
| logger.info("Database already at latest version") |
| else: |
| logger.error("Migration failed") |
|
|
| return success |
|
|
| except Exception as e: |
| logger.error(f"Auto-migration failed: {e}") |
| return False |
|
|