Rifqi Hafizuddin
[NOTICKET][DB] pisahin db credential ke folder model. add ingestion endpoint at db_client to use db pipeline. add router db_client di main.
347a73a | """Service for managing user-registered external database connections.""" | |
| import uuid | |
| from typing import List, Optional | |
| from sqlalchemy import delete, select | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from src.db.postgres.models import DatabaseClient | |
| from src.middlewares.logging import get_logger | |
| from src.utils.db_credential_encryption import ( | |
| decrypt_credentials_dict, | |
| encrypt_credentials_dict, | |
| ) | |
| logger = get_logger("database_client_service") | |
| class DatabaseClientService: | |
| """Service for managing user-registered external database connections.""" | |
| async def create( | |
| self, | |
| db: AsyncSession, | |
| user_id: str, | |
| name: str, | |
| db_type: str, | |
| credentials: dict, | |
| ) -> DatabaseClient: | |
| """Register a new database client connection. | |
| Credentials are encrypted before being stored. | |
| """ | |
| client = DatabaseClient( | |
| id=str(uuid.uuid4()), | |
| user_id=user_id, | |
| name=name, | |
| db_type=db_type, | |
| credentials=encrypt_credentials_dict(credentials), | |
| status="active", | |
| ) | |
| db.add(client) | |
| await db.commit() | |
| await db.refresh(client) | |
| logger.info(f"Created database client {client.id} for user {user_id}") | |
| return client | |
| async def get_user_clients( | |
| self, | |
| db: AsyncSession, | |
| user_id: str, | |
| ) -> List[DatabaseClient]: | |
| """Return all active and inactive database clients for a user.""" | |
| result = await db.execute( | |
| select(DatabaseClient) | |
| .where(DatabaseClient.user_id == user_id) | |
| .order_by(DatabaseClient.created_at.desc()) | |
| ) | |
| return result.scalars().all() | |
| async def get( | |
| self, | |
| db: AsyncSession, | |
| client_id: str, | |
| ) -> Optional[DatabaseClient]: | |
| """Return a single database client by its ID.""" | |
| result = await db.execute( | |
| select(DatabaseClient).where(DatabaseClient.id == client_id) | |
| ) | |
| return result.scalars().first() | |
| async def update( | |
| self, | |
| db: AsyncSession, | |
| client_id: str, | |
| name: Optional[str] = None, | |
| credentials: Optional[dict] = None, | |
| status: Optional[str] = None, | |
| ) -> Optional[DatabaseClient]: | |
| """Update an existing database client connection. | |
| Only non-None fields are updated. | |
| Credentials are re-encrypted if provided. | |
| """ | |
| client = await self.get(db, client_id) | |
| if not client: | |
| return None | |
| if name is not None: | |
| client.name = name | |
| if credentials is not None: | |
| client.credentials = encrypt_credentials_dict(credentials) | |
| if status is not None: | |
| client.status = status | |
| await db.commit() | |
| await db.refresh(client) | |
| logger.info(f"Updated database client {client_id}") | |
| return client | |
| async def delete( | |
| self, | |
| db: AsyncSession, | |
| client_id: str, | |
| ) -> bool: | |
| """Permanently delete a database client connection.""" | |
| result = await db.execute( | |
| delete(DatabaseClient).where(DatabaseClient.id == client_id) | |
| ) | |
| await db.commit() | |
| deleted = result.rowcount > 0 | |
| if deleted: | |
| logger.info(f"Deleted database client {client_id}") | |
| return deleted | |
| database_client_service = DatabaseClientService() | |