| | from abc import ABC, abstractmethod |
| | from typing import Dict, Any, List, Union |
| | from enum import Enum |
| |
|
| |
|
| |
|
| | class DatabaseType(Enum): |
| | """Enumeration of supported database types""" |
| | MONGODB = "mongodb" |
| | POSTGRESQL = "postgresql" |
| | MYSQL = "mysql" |
| | SQLITE = "sqlite" |
| | REDIS = "redis" |
| | ELASTICSEARCH = "elasticsearch" |
| | NEO4J = "neo4j" |
| | VECTOR = "vector" |
| |
|
| |
|
| | class QueryType(Enum): |
| | """Enumeration of query types""" |
| | SELECT = "select" |
| | INSERT = "insert" |
| | UPDATE = "update" |
| | DELETE = "delete" |
| | CREATE = "create" |
| | DROP = "drop" |
| | ALTER = "alter" |
| | INDEX = "index" |
| | AGGREGATE = "aggregate" |
| | SEARCH = "search" |
| | VECTOR_SEARCH = "vector_search" |
| |
|
| |
|
| | class DatabaseConnection: |
| | """Base class for database connection management""" |
| | |
| | def __init__(self, connection_string: str, **kwargs): |
| | self.connection_string = connection_string |
| | self.connection_params = kwargs |
| | self._connection = None |
| | self._is_connected = False |
| | |
| | @property |
| | def is_connected(self) -> bool: |
| | return self._is_connected |
| | |
| | @abstractmethod |
| | def connect(self) -> bool: |
| | """Establish connection to the database""" |
| | pass |
| | |
| | @abstractmethod |
| | def disconnect(self) -> bool: |
| | """Close connection to the database""" |
| | pass |
| | |
| | @abstractmethod |
| | def test_connection(self) -> bool: |
| | """Test if the connection is working""" |
| | pass |
| |
|
| |
|
| | class DatabaseBase(ABC): |
| | """ |
| | Abstract base class for database operations. |
| | Provides a common interface for different database types. |
| | """ |
| | |
| | def __init__(self, |
| | connection_string: str = None, |
| | database_name: str = None, |
| | **kwargs): |
| | """ |
| | Initialize the database base. |
| | |
| | Args: |
| | connection_string: Database connection string |
| | database_name: Name of the database to use |
| | **kwargs: Additional connection parameters |
| | """ |
| | self.connection_string = connection_string |
| | self.database_name = database_name |
| | self.connection_params = kwargs |
| | self.db_type = self._get_database_type() |
| | self.connection = None |
| | self._is_initialized = False |
| | |
| | |
| | if connection_string: |
| | self.connect() |
| | |
| | @abstractmethod |
| | def _get_database_type(self) -> DatabaseType: |
| | """Return the database type""" |
| | pass |
| | |
| | @abstractmethod |
| | def connect(self) -> bool: |
| | """ |
| | Establish connection to the database. |
| | |
| | Returns: |
| | bool: True if connection successful, False otherwise |
| | """ |
| | pass |
| | |
| | @abstractmethod |
| | def disconnect(self) -> bool: |
| | """ |
| | Close connection to the database. |
| | |
| | Returns: |
| | bool: True if disconnection successful, False otherwise |
| | """ |
| | pass |
| | |
| | @abstractmethod |
| | def test_connection(self) -> bool: |
| | """ |
| | Test if the database connection is working. |
| | |
| | Returns: |
| | bool: True if connection is working, False otherwise |
| | """ |
| | pass |
| | |
| | @abstractmethod |
| | def execute_query(self, |
| | query: Union[str, Dict, List], |
| | query_type: QueryType = None, |
| | **kwargs) -> Dict[str, Any]: |
| | """ |
| | Execute a query on the database. |
| | |
| | Args: |
| | query: The query to execute (string for SQL, dict/list for NoSQL) |
| | query_type: Type of query being executed |
| | **kwargs: Additional query parameters |
| | |
| | Returns: |
| | Dict containing query results and metadata |
| | """ |
| | pass |
| | |
| | @abstractmethod |
| | def get_database_info(self) -> Dict[str, Any]: |
| | """ |
| | Get information about the database. |
| | |
| | Returns: |
| | Dict containing database information |
| | """ |
| | pass |
| | |
| | @abstractmethod |
| | def list_collections(self) -> List[str]: |
| | """ |
| | List all collections/tables in the database. |
| | |
| | Returns: |
| | List of collection/table names |
| | """ |
| | pass |
| | |
| | @abstractmethod |
| | def get_collection_info(self, collection_name: str) -> Dict[str, Any]: |
| | """ |
| | Get information about a specific collection/table. |
| | |
| | Args: |
| | collection_name: Name of the collection/table |
| | |
| | Returns: |
| | Dict containing collection/table information |
| | """ |
| | pass |
| | |
| | @abstractmethod |
| | def get_schema(self, collection_name: str = None) -> Dict[str, Any]: |
| | """ |
| | Get the schema/structure of the database or a specific collection. |
| | |
| | Args: |
| | collection_name: Name of the collection/table (optional) |
| | |
| | Returns: |
| | Dict containing schema information |
| | """ |
| | pass |
| | |
| | def validate_query(self, query: Union[str, Dict, List]) -> Dict[str, Any]: |
| | """ |
| | Validate a query before execution. |
| | |
| | Args: |
| | query: The query to validate |
| | |
| | Returns: |
| | Dict containing validation results |
| | """ |
| | try: |
| | |
| | if isinstance(query, str): |
| | if not query.strip(): |
| | return {"valid": False, "error": "Query cannot be empty"} |
| | elif isinstance(query, (dict, list)): |
| | if not query: |
| | return {"valid": False, "error": "Query cannot be empty"} |
| | else: |
| | return {"valid": False, "error": f"Unsupported query type: {type(query)}"} |
| | |
| | return {"valid": True, "error": None} |
| | |
| | except Exception as e: |
| | return {"valid": False, "error": str(e)} |
| | |
| | def format_query_result(self, |
| | data: Any, |
| | query_type: QueryType, |
| | execution_time: float = None, |
| | **kwargs) -> Dict[str, Any]: |
| | """ |
| | Format query results into a standard structure. |
| | |
| | Args: |
| | data: Raw query results |
| | query_type: Type of query that was executed |
| | execution_time: Time taken to execute the query |
| | **kwargs: Additional metadata |
| | |
| | Returns: |
| | Dict containing formatted results |
| | """ |
| | return { |
| | "success": True, |
| | "data": data, |
| | "query_type": query_type.value if query_type else None, |
| | "execution_time": execution_time, |
| | "row_count": len(data) if isinstance(data, (list, tuple)) else 1, |
| | "metadata": kwargs |
| | } |
| | |
| | def format_error_result(self, |
| | error: str, |
| | query_type: QueryType = None, |
| | **kwargs) -> Dict[str, Any]: |
| | """ |
| | Format error results into a standard structure. |
| | |
| | Args: |
| | error: Error message |
| | query_type: Type of query that failed |
| | **kwargs: Additional error metadata |
| | |
| | Returns: |
| | Dict containing formatted error results |
| | """ |
| | return { |
| | "success": False, |
| | "error": error, |
| | "query_type": query_type.value if query_type else None, |
| | "data": None, |
| | "execution_time": None, |
| | "row_count": 0, |
| | "metadata": kwargs |
| | } |
| | |
| | def get_supported_query_types(self) -> List[QueryType]: |
| | """ |
| | Get list of supported query types for this database. |
| | |
| | Returns: |
| | List of supported QueryType enums |
| | """ |
| | return [ |
| | QueryType.SELECT, |
| | QueryType.INSERT, |
| | QueryType.UPDATE, |
| | QueryType.DELETE, |
| | QueryType.CREATE, |
| | QueryType.DROP |
| | ] |
| | |
| | def get_capabilities(self) -> Dict[str, Any]: |
| | """ |
| | Get database capabilities and features. |
| | |
| | Returns: |
| | Dict containing database capabilities |
| | """ |
| | return { |
| | "database_type": self.db_type.value, |
| | "supports_sql": False, |
| | "supports_aggregation": False, |
| | "supports_full_text_search": False, |
| | "supports_vector_search": False, |
| | "supports_transactions": False, |
| | "supports_indexing": True, |
| | "supported_query_types": [qt.value for qt in self.get_supported_query_types()], |
| | "connection_info": { |
| | "is_connected": self.connection is not None, |
| | "database_name": self.database_name |
| | } |
| | } |
| | |
| | def __enter__(self): |
| | """Context manager entry""" |
| | if not self.connection: |
| | self.connect() |
| | return self |
| | |
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | """Context manager exit""" |
| | self.disconnect() |
| | |
| | def __del__(self): |
| | """Cleanup on deletion""" |
| | try: |
| | self.disconnect() |
| | except Exception: |
| | pass |