| """ |
| Database initialization and management |
| """ |
|
|
| import sqlite3 |
| import logging |
| import os |
| from pathlib import Path |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class DatabaseManager: |
| def __init__(self, db_path: str = "sessions.db"): |
| self.db_path = db_path |
| self.connection = None |
| self._init_db() |
| |
| def _init_db(self): |
| """Initialize database with required tables""" |
| try: |
| |
| os.makedirs(os.path.dirname(self.db_path), exist_ok=True) |
| |
| self.connection = sqlite3.connect(self.db_path, check_same_thread=False) |
| self.connection.row_factory = sqlite3.Row |
| |
| |
| self._create_tables() |
| logger.info(f"Database initialized at {self.db_path}") |
| |
| except Exception as e: |
| logger.error(f"Database initialization failed: {e}") |
| |
| self.connection = sqlite3.connect(":memory:", check_same_thread=False) |
| self._create_tables() |
| logger.info("Using in-memory database as fallback") |
| |
| def _create_tables(self): |
| """Create required database tables with indexes for performance""" |
| cursor = self.connection.cursor() |
| |
| |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS sessions ( |
| session_id TEXT PRIMARY KEY, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| context_data TEXT, |
| user_metadata TEXT |
| ) |
| """) |
| |
| |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS interactions ( |
| interaction_id TEXT PRIMARY KEY, |
| session_id TEXT REFERENCES sessions(session_id), |
| user_input TEXT NOT NULL, |
| agent_trace TEXT, |
| final_response TEXT, |
| processing_time INTEGER, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """) |
| |
| |
| indexes = [ |
| "CREATE INDEX IF NOT EXISTS idx_sessions_last_activity ON sessions(last_activity)", |
| "CREATE INDEX IF NOT EXISTS idx_interactions_session_id ON interactions(session_id)", |
| "CREATE INDEX IF NOT EXISTS idx_interactions_created_at ON interactions(created_at)" |
| ] |
| |
| for index_sql in indexes: |
| try: |
| cursor.execute(index_sql) |
| except Exception as e: |
| logger.debug(f"Index creation skipped (may already exist): {e}") |
| |
| self.connection.commit() |
| logger.info("Database tables and indexes created successfully") |
| |
| def get_connection(self): |
| """Get database connection""" |
| return self.connection |
| |
| def close(self): |
| """Close database connection""" |
| if self.connection: |
| self.connection.close() |
| logger.info("Database connection closed") |
|
|
| |
| db_manager = None |
|
|
| def init_database(db_path: str = "sessions.db"): |
| """Initialize global database instance""" |
| global db_manager |
| if db_manager is None: |
| db_manager = DatabaseManager(db_path) |
| return db_manager |
|
|
| def get_db(): |
| """Get database connection""" |
| global db_manager |
| if db_manager is None: |
| init_database() |
| return db_manager.get_connection() |
|
|
| |
| init_database() |
|
|