Rifqi Hafizuddin commited on
Commit ·
347a73a
1
Parent(s): e13a901
[NOTICKET][DB] pisahin db credential ke folder model. add ingestion endpoint at db_client to use db pipeline. add router db_client di main.
Browse files- main.py +2 -0
- src/api/v1/db_client.py +360 -3
- src/database_client/database_client_service.py +118 -0
- src/models/credentials.py +164 -0
main.py
CHANGED
|
@@ -10,6 +10,7 @@ from src.api.v1.chat import router as chat_router
|
|
| 10 |
from src.api.v1.room import router as room_router
|
| 11 |
from src.api.v1.users import router as users_router
|
| 12 |
from src.api.v1.knowledge import router as knowledge_router
|
|
|
|
| 13 |
from src.db.postgres.init_db import init_db
|
| 14 |
import uvicorn
|
| 15 |
|
|
@@ -35,6 +36,7 @@ app.include_router(document_router)
|
|
| 35 |
app.include_router(knowledge_router)
|
| 36 |
app.include_router(room_router)
|
| 37 |
app.include_router(chat_router)
|
|
|
|
| 38 |
|
| 39 |
|
| 40 |
@app.on_event("startup")
|
|
|
|
| 10 |
from src.api.v1.room import router as room_router
|
| 11 |
from src.api.v1.users import router as users_router
|
| 12 |
from src.api.v1.knowledge import router as knowledge_router
|
| 13 |
+
from src.api.v1.db_client import router as db_client_router
|
| 14 |
from src.db.postgres.init_db import init_db
|
| 15 |
import uvicorn
|
| 16 |
|
|
|
|
| 36 |
app.include_router(knowledge_router)
|
| 37 |
app.include_router(room_router)
|
| 38 |
app.include_router(chat_router)
|
| 39 |
+
app.include_router(db_client_router)
|
| 40 |
|
| 41 |
|
| 42 |
@app.on_event("startup")
|
src/api/v1/db_client.py
CHANGED
|
@@ -1,5 +1,362 @@
|
|
| 1 |
-
|
| 2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 3 |
|
| 4 |
-
|
| 5 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""API endpoints for user-registered database connections.
|
| 2 |
|
| 3 |
+
Credential schemas (DbType, PostgresCredentials, etc.) live in
|
| 4 |
+
`src/models/credentials.py` — they are imported below (with noqa: F401) so
|
| 5 |
+
FastAPI/Swagger picks them up for OpenAPI schema generation even though they
|
| 6 |
+
are not referenced by name in this file.
|
| 7 |
+
"""
|
| 8 |
|
| 9 |
+
from typing import Any, Dict, List, Literal, Optional
|
| 10 |
+
from datetime import datetime
|
| 11 |
+
|
| 12 |
+
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
|
| 13 |
+
from pydantic import BaseModel, Field
|
| 14 |
+
from sqlalchemy.ext.asyncio import AsyncSession
|
| 15 |
+
|
| 16 |
+
from src.database_client.database_client_service import database_client_service
|
| 17 |
+
from src.db.postgres.connection import get_db
|
| 18 |
+
from src.middlewares.logging import get_logger, log_execution
|
| 19 |
+
from src.middlewares.rate_limit import limiter
|
| 20 |
+
from src.models.credentials import ( # noqa: F401 — re-exported for Swagger schema discovery
|
| 21 |
+
BigQueryCredentials,
|
| 22 |
+
CredentialSchemas,
|
| 23 |
+
DbType,
|
| 24 |
+
MysqlCredentials,
|
| 25 |
+
PostgresCredentials,
|
| 26 |
+
SnowflakeCredentials,
|
| 27 |
+
SqlServerCredentials,
|
| 28 |
+
SupabaseCredentials,
|
| 29 |
+
)
|
| 30 |
+
from src.pipeline.db_pipeline import db_pipeline_service
|
| 31 |
+
from src.utils.db_credential_encryption import decrypt_credentials_dict
|
| 32 |
+
|
| 33 |
+
logger = get_logger("database_client_api")
|
| 34 |
+
|
| 35 |
+
router = APIRouter(prefix="/api/v1", tags=["Database Clients"])
|
| 36 |
+
|
| 37 |
+
|
| 38 |
+
# ---------------------------------------------------------------------------
|
| 39 |
+
# Request / Response schemas
|
| 40 |
+
# ---------------------------------------------------------------------------
|
| 41 |
+
|
| 42 |
+
|
| 43 |
+
class DatabaseClientCreate(BaseModel):
|
| 44 |
+
"""
|
| 45 |
+
Payload to register a new external database connection.
|
| 46 |
+
|
| 47 |
+
The `credentials` object shape depends on `db_type`:
|
| 48 |
+
|
| 49 |
+
| db_type | Required fields |
|
| 50 |
+
|-------------|----------------------------------------------------------|
|
| 51 |
+
| postgres | host, port, database, username, password, ssl_mode |
|
| 52 |
+
| mysql | host, port, database, username, password, ssl |
|
| 53 |
+
| sqlserver | host, port, database, username, password, driver? |
|
| 54 |
+
| supabase | host, port, database, username, password, ssl_mode |
|
| 55 |
+
| bigquery | project_id, dataset_id, location?, service_account_json |
|
| 56 |
+
| snowflake | account, warehouse, database, schema?, username, password, role? |
|
| 57 |
+
|
| 58 |
+
Sensitive fields (`password`, `service_account_json`) are encrypted
|
| 59 |
+
at rest using Fernet symmetric encryption.
|
| 60 |
+
"""
|
| 61 |
+
|
| 62 |
+
name: str = Field(..., description="Display name for this connection.", examples=["Production DB"])
|
| 63 |
+
db_type: DbType = Field(..., description="Type of the database engine.", examples=["postgres"])
|
| 64 |
+
credentials: Dict[str, Any] = Field(
|
| 65 |
+
...,
|
| 66 |
+
description="Connection credentials. Shape depends on db_type. See schema descriptions above.",
|
| 67 |
+
examples=[
|
| 68 |
+
{
|
| 69 |
+
"host": "db.example.com",
|
| 70 |
+
"port": 5432,
|
| 71 |
+
"database": "mydb",
|
| 72 |
+
"username": "admin",
|
| 73 |
+
"password": "s3cr3t!",
|
| 74 |
+
"ssl_mode": "require",
|
| 75 |
+
}
|
| 76 |
+
],
|
| 77 |
+
)
|
| 78 |
+
|
| 79 |
+
|
| 80 |
+
class DatabaseClientUpdate(BaseModel):
|
| 81 |
+
"""
|
| 82 |
+
Payload to update an existing database connection.
|
| 83 |
+
|
| 84 |
+
All fields are optional — only provided fields will be updated.
|
| 85 |
+
If `credentials` is provided, it replaces the entire credentials object
|
| 86 |
+
and sensitive fields are re-encrypted.
|
| 87 |
+
"""
|
| 88 |
+
|
| 89 |
+
name: Optional[str] = Field(None, description="New display name for this connection.", examples=["Staging DB"])
|
| 90 |
+
credentials: Optional[Dict[str, Any]] = Field(
|
| 91 |
+
None,
|
| 92 |
+
description="Updated credentials object. Replaces existing credentials entirely if provided.",
|
| 93 |
+
examples=[{"host": "new-host.example.com", "port": 5432, "database": "mydb", "username": "admin", "password": "n3wP@ss!", "ssl_mode": "require"}],
|
| 94 |
+
)
|
| 95 |
+
status: Optional[Literal["active", "inactive"]] = Field(
|
| 96 |
+
None,
|
| 97 |
+
description="Set to 'inactive' to soft-disable the connection without deleting it.",
|
| 98 |
+
examples=["inactive"],
|
| 99 |
+
)
|
| 100 |
+
|
| 101 |
+
|
| 102 |
+
class DatabaseClientResponse(BaseModel):
|
| 103 |
+
"""
|
| 104 |
+
Database connection record returned by the API.
|
| 105 |
+
|
| 106 |
+
Credentials are **never** included in the response for security reasons.
|
| 107 |
+
"""
|
| 108 |
+
|
| 109 |
+
id: str = Field(..., description="Unique identifier of the database connection.")
|
| 110 |
+
user_id: str = Field(..., description="ID of the user who owns this connection.")
|
| 111 |
+
name: str = Field(..., description="Display name of the connection.")
|
| 112 |
+
db_type: str = Field(..., description="Database engine type.")
|
| 113 |
+
status: str = Field(..., description="Connection status: 'active' or 'inactive'.")
|
| 114 |
+
created_at: datetime = Field(..., description="Timestamp when the connection was registered.")
|
| 115 |
+
updated_at: Optional[datetime] = Field(None, description="Timestamp of the last update, if any.")
|
| 116 |
+
|
| 117 |
+
model_config = {"from_attributes": True}
|
| 118 |
+
|
| 119 |
+
|
| 120 |
+
# ---------------------------------------------------------------------------
|
| 121 |
+
# Endpoints
|
| 122 |
+
# ---------------------------------------------------------------------------
|
| 123 |
+
|
| 124 |
+
|
| 125 |
+
@router.post(
|
| 126 |
+
"/database-clients",
|
| 127 |
+
response_model=DatabaseClientResponse,
|
| 128 |
+
status_code=status.HTTP_201_CREATED,
|
| 129 |
+
summary="Register a new database connection",
|
| 130 |
+
response_description="The newly created database connection record (credentials excluded).",
|
| 131 |
+
responses={
|
| 132 |
+
201: {"description": "Connection registered successfully."},
|
| 133 |
+
422: {"description": "Validation error — check the credentials shape for the given db_type."},
|
| 134 |
+
500: {"description": "Internal server error."},
|
| 135 |
+
},
|
| 136 |
+
)
|
| 137 |
+
@limiter.limit("10/minute")
|
| 138 |
+
@log_execution(logger)
|
| 139 |
+
async def create_database_client(
|
| 140 |
+
request: Request,
|
| 141 |
+
payload: DatabaseClientCreate,
|
| 142 |
+
user_id: str = Query(..., description="ID of the user registering the connection."),
|
| 143 |
+
db: AsyncSession = Depends(get_db),
|
| 144 |
+
):
|
| 145 |
+
"""
|
| 146 |
+
Register a new external database connection for a user.
|
| 147 |
+
|
| 148 |
+
The `credentials` object must match the shape for the chosen `db_type`
|
| 149 |
+
(see **CredentialSchemas** in the schema section below for exact fields).
|
| 150 |
+
Sensitive fields (`password`, `service_account_json`) are encrypted
|
| 151 |
+
before being persisted — they are never returned in any response.
|
| 152 |
+
"""
|
| 153 |
+
try:
|
| 154 |
+
client = await database_client_service.create(
|
| 155 |
+
db=db,
|
| 156 |
+
user_id=user_id,
|
| 157 |
+
name=payload.name,
|
| 158 |
+
db_type=payload.db_type,
|
| 159 |
+
credentials=payload.credentials,
|
| 160 |
+
)
|
| 161 |
+
return DatabaseClientResponse.model_validate(client)
|
| 162 |
+
except Exception as e:
|
| 163 |
+
logger.error(f"Failed to create database client for user {user_id}", error=str(e))
|
| 164 |
+
raise HTTPException(
|
| 165 |
+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
| 166 |
+
detail=f"Failed to create database client: {str(e)}",
|
| 167 |
+
)
|
| 168 |
+
|
| 169 |
+
|
| 170 |
+
@router.get(
|
| 171 |
+
"/database-clients/{user_id}",
|
| 172 |
+
response_model=List[DatabaseClientResponse],
|
| 173 |
+
summary="List all database connections for a user",
|
| 174 |
+
response_description="List of database connections (credentials excluded).",
|
| 175 |
+
responses={
|
| 176 |
+
200: {"description": "Returns an empty list if the user has no connections."},
|
| 177 |
+
},
|
| 178 |
+
)
|
| 179 |
+
@log_execution(logger)
|
| 180 |
+
async def list_database_clients(
|
| 181 |
+
user_id: str,
|
| 182 |
+
db: AsyncSession = Depends(get_db),
|
| 183 |
+
):
|
| 184 |
+
"""
|
| 185 |
+
Return all database connections registered by the specified user,
|
| 186 |
+
ordered by creation date (newest first).
|
| 187 |
+
|
| 188 |
+
Credentials are never included in the response.
|
| 189 |
+
"""
|
| 190 |
+
clients = await database_client_service.get_user_clients(db, user_id)
|
| 191 |
+
return [DatabaseClientResponse.model_validate(c) for c in clients]
|
| 192 |
+
|
| 193 |
+
|
| 194 |
+
@router.get(
|
| 195 |
+
"/database-clients/{user_id}/{client_id}",
|
| 196 |
+
response_model=DatabaseClientResponse,
|
| 197 |
+
summary="Get a single database connection",
|
| 198 |
+
response_description="Database connection detail (credentials excluded).",
|
| 199 |
+
responses={
|
| 200 |
+
404: {"description": "Connection not found."},
|
| 201 |
+
403: {"description": "Access denied — user_id does not own this connection."},
|
| 202 |
+
},
|
| 203 |
+
)
|
| 204 |
+
@log_execution(logger)
|
| 205 |
+
async def get_database_client(
|
| 206 |
+
user_id: str,
|
| 207 |
+
client_id: str,
|
| 208 |
+
db: AsyncSession = Depends(get_db),
|
| 209 |
+
):
|
| 210 |
+
"""
|
| 211 |
+
Return the detail of a single database connection.
|
| 212 |
+
|
| 213 |
+
Returns **403** if the `user_id` in the path does not match the owner
|
| 214 |
+
of the requested connection.
|
| 215 |
+
"""
|
| 216 |
+
client = await database_client_service.get(db, client_id)
|
| 217 |
+
|
| 218 |
+
if not client:
|
| 219 |
+
raise HTTPException(status_code=404, detail="Database client not found")
|
| 220 |
+
|
| 221 |
+
if client.user_id != user_id:
|
| 222 |
+
raise HTTPException(status_code=403, detail="Access denied")
|
| 223 |
+
|
| 224 |
+
return DatabaseClientResponse.model_validate(client)
|
| 225 |
+
|
| 226 |
+
|
| 227 |
+
@router.put(
|
| 228 |
+
"/database-clients/{client_id}",
|
| 229 |
+
response_model=DatabaseClientResponse,
|
| 230 |
+
summary="Update a database connection",
|
| 231 |
+
response_description="Updated database connection record (credentials excluded).",
|
| 232 |
+
responses={
|
| 233 |
+
404: {"description": "Connection not found."},
|
| 234 |
+
403: {"description": "Access denied — user_id does not own this connection."},
|
| 235 |
+
},
|
| 236 |
+
)
|
| 237 |
+
@log_execution(logger)
|
| 238 |
+
async def update_database_client(
|
| 239 |
+
client_id: str,
|
| 240 |
+
payload: DatabaseClientUpdate,
|
| 241 |
+
user_id: str = Query(..., description="ID of the user who owns the connection."),
|
| 242 |
+
db: AsyncSession = Depends(get_db),
|
| 243 |
+
):
|
| 244 |
+
"""
|
| 245 |
+
Update an existing database connection.
|
| 246 |
+
|
| 247 |
+
Only fields present in the request body are updated.
|
| 248 |
+
If `credentials` is provided it **replaces** the entire credentials object
|
| 249 |
+
and sensitive fields are re-encrypted automatically.
|
| 250 |
+
"""
|
| 251 |
+
client = await database_client_service.get(db, client_id)
|
| 252 |
+
|
| 253 |
+
if not client:
|
| 254 |
+
raise HTTPException(status_code=404, detail="Database client not found")
|
| 255 |
+
|
| 256 |
+
if client.user_id != user_id:
|
| 257 |
+
raise HTTPException(status_code=403, detail="Access denied")
|
| 258 |
+
|
| 259 |
+
updated = await database_client_service.update(
|
| 260 |
+
db=db,
|
| 261 |
+
client_id=client_id,
|
| 262 |
+
name=payload.name,
|
| 263 |
+
credentials=payload.credentials,
|
| 264 |
+
status=payload.status,
|
| 265 |
+
)
|
| 266 |
+
return DatabaseClientResponse.model_validate(updated)
|
| 267 |
+
|
| 268 |
+
|
| 269 |
+
@router.delete(
|
| 270 |
+
"/database-clients/{client_id}",
|
| 271 |
+
status_code=status.HTTP_200_OK,
|
| 272 |
+
summary="Delete a database connection",
|
| 273 |
+
responses={
|
| 274 |
+
200: {"description": "Connection deleted successfully."},
|
| 275 |
+
404: {"description": "Connection not found."},
|
| 276 |
+
403: {"description": "Access denied — user_id does not own this connection."},
|
| 277 |
+
},
|
| 278 |
+
)
|
| 279 |
+
@log_execution(logger)
|
| 280 |
+
async def delete_database_client(
|
| 281 |
+
client_id: str,
|
| 282 |
+
user_id: str = Query(..., description="ID of the user who owns the connection."),
|
| 283 |
+
db: AsyncSession = Depends(get_db),
|
| 284 |
+
):
|
| 285 |
+
"""
|
| 286 |
+
Permanently delete a database connection.
|
| 287 |
+
|
| 288 |
+
This action is irreversible. The stored credentials are also removed.
|
| 289 |
+
"""
|
| 290 |
+
client = await database_client_service.get(db, client_id)
|
| 291 |
+
|
| 292 |
+
if not client:
|
| 293 |
+
raise HTTPException(status_code=404, detail="Database client not found")
|
| 294 |
+
|
| 295 |
+
if client.user_id != user_id:
|
| 296 |
+
raise HTTPException(status_code=403, detail="Access denied")
|
| 297 |
+
|
| 298 |
+
await database_client_service.delete(db, client_id)
|
| 299 |
+
return {"status": "success", "message": "Database client deleted successfully"}
|
| 300 |
+
|
| 301 |
+
|
| 302 |
+
@router.post(
|
| 303 |
+
"/database-clients/{client_id}/ingest",
|
| 304 |
+
status_code=status.HTTP_200_OK,
|
| 305 |
+
summary="Ingest schema from a registered database into the vector store",
|
| 306 |
+
response_description="Count of chunks ingested.",
|
| 307 |
+
responses={
|
| 308 |
+
200: {"description": "Ingestion completed successfully."},
|
| 309 |
+
403: {"description": "Access denied — user_id does not own this connection."},
|
| 310 |
+
404: {"description": "Connection not found."},
|
| 311 |
+
501: {"description": "The connection's db_type is not yet supported by the pipeline."},
|
| 312 |
+
500: {"description": "Ingestion failed (connection error, profiling error, etc.)."},
|
| 313 |
+
},
|
| 314 |
+
)
|
| 315 |
+
@limiter.limit("5/minute")
|
| 316 |
+
@log_execution(logger)
|
| 317 |
+
async def ingest_database_client(
|
| 318 |
+
request: Request,
|
| 319 |
+
client_id: str,
|
| 320 |
+
user_id: str = Query(..., description="ID of the user who owns the connection."),
|
| 321 |
+
db: AsyncSession = Depends(get_db),
|
| 322 |
+
):
|
| 323 |
+
"""
|
| 324 |
+
Decrypt the stored credentials, connect to the user's database, introspect
|
| 325 |
+
its schema, profile each column, embed the descriptions, and store them in
|
| 326 |
+
the shared PGVector collection tagged with `source_type="database"`.
|
| 327 |
+
|
| 328 |
+
Chunks become retrievable via the same retriever used for document chunks.
|
| 329 |
+
"""
|
| 330 |
+
client = await database_client_service.get(db, client_id)
|
| 331 |
+
|
| 332 |
+
if not client:
|
| 333 |
+
raise HTTPException(status_code=404, detail="Database client not found")
|
| 334 |
+
|
| 335 |
+
if client.user_id != user_id:
|
| 336 |
+
raise HTTPException(status_code=403, detail="Access denied")
|
| 337 |
+
|
| 338 |
+
creds = decrypt_credentials_dict(client.credentials)
|
| 339 |
+
|
| 340 |
+
try:
|
| 341 |
+
with db_pipeline_service.engine_scope(
|
| 342 |
+
db_type=client.db_type,
|
| 343 |
+
host=creds["host"],
|
| 344 |
+
port=creds["port"],
|
| 345 |
+
database=creds["database"],
|
| 346 |
+
username=creds["username"],
|
| 347 |
+
password=creds["password"],
|
| 348 |
+
ssl_mode=creds.get("ssl_mode"),
|
| 349 |
+
) as engine:
|
| 350 |
+
total = await db_pipeline_service.run(user_id=user_id, engine=engine)
|
| 351 |
+
except NotImplementedError as e:
|
| 352 |
+
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail=str(e))
|
| 353 |
+
except Exception as e:
|
| 354 |
+
logger.error(
|
| 355 |
+
f"Ingestion failed for client {client_id}", user_id=user_id, error=str(e)
|
| 356 |
+
)
|
| 357 |
+
raise HTTPException(
|
| 358 |
+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
| 359 |
+
detail=f"Ingestion failed: {e}",
|
| 360 |
+
)
|
| 361 |
+
|
| 362 |
+
return {"status": "success", "client_id": client_id, "chunks_ingested": total}
|
src/database_client/database_client_service.py
ADDED
|
@@ -0,0 +1,118 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""Service for managing user-registered external database connections."""
|
| 2 |
+
|
| 3 |
+
import uuid
|
| 4 |
+
from typing import List, Optional
|
| 5 |
+
|
| 6 |
+
from sqlalchemy import delete, select
|
| 7 |
+
from sqlalchemy.ext.asyncio import AsyncSession
|
| 8 |
+
|
| 9 |
+
from src.db.postgres.models import DatabaseClient
|
| 10 |
+
from src.middlewares.logging import get_logger
|
| 11 |
+
from src.utils.db_credential_encryption import (
|
| 12 |
+
decrypt_credentials_dict,
|
| 13 |
+
encrypt_credentials_dict,
|
| 14 |
+
)
|
| 15 |
+
|
| 16 |
+
logger = get_logger("database_client_service")
|
| 17 |
+
|
| 18 |
+
|
| 19 |
+
class DatabaseClientService:
|
| 20 |
+
"""Service for managing user-registered external database connections."""
|
| 21 |
+
|
| 22 |
+
async def create(
|
| 23 |
+
self,
|
| 24 |
+
db: AsyncSession,
|
| 25 |
+
user_id: str,
|
| 26 |
+
name: str,
|
| 27 |
+
db_type: str,
|
| 28 |
+
credentials: dict,
|
| 29 |
+
) -> DatabaseClient:
|
| 30 |
+
"""Register a new database client connection.
|
| 31 |
+
|
| 32 |
+
Credentials are encrypted before being stored.
|
| 33 |
+
"""
|
| 34 |
+
client = DatabaseClient(
|
| 35 |
+
id=str(uuid.uuid4()),
|
| 36 |
+
user_id=user_id,
|
| 37 |
+
name=name,
|
| 38 |
+
db_type=db_type,
|
| 39 |
+
credentials=encrypt_credentials_dict(credentials),
|
| 40 |
+
status="active",
|
| 41 |
+
)
|
| 42 |
+
db.add(client)
|
| 43 |
+
await db.commit()
|
| 44 |
+
await db.refresh(client)
|
| 45 |
+
logger.info(f"Created database client {client.id} for user {user_id}")
|
| 46 |
+
return client
|
| 47 |
+
|
| 48 |
+
async def get_user_clients(
|
| 49 |
+
self,
|
| 50 |
+
db: AsyncSession,
|
| 51 |
+
user_id: str,
|
| 52 |
+
) -> List[DatabaseClient]:
|
| 53 |
+
"""Return all active and inactive database clients for a user."""
|
| 54 |
+
result = await db.execute(
|
| 55 |
+
select(DatabaseClient)
|
| 56 |
+
.where(DatabaseClient.user_id == user_id)
|
| 57 |
+
.order_by(DatabaseClient.created_at.desc())
|
| 58 |
+
)
|
| 59 |
+
return result.scalars().all()
|
| 60 |
+
|
| 61 |
+
async def get(
|
| 62 |
+
self,
|
| 63 |
+
db: AsyncSession,
|
| 64 |
+
client_id: str,
|
| 65 |
+
) -> Optional[DatabaseClient]:
|
| 66 |
+
"""Return a single database client by its ID."""
|
| 67 |
+
result = await db.execute(
|
| 68 |
+
select(DatabaseClient).where(DatabaseClient.id == client_id)
|
| 69 |
+
)
|
| 70 |
+
return result.scalars().first()
|
| 71 |
+
|
| 72 |
+
async def update(
|
| 73 |
+
self,
|
| 74 |
+
db: AsyncSession,
|
| 75 |
+
client_id: str,
|
| 76 |
+
name: Optional[str] = None,
|
| 77 |
+
credentials: Optional[dict] = None,
|
| 78 |
+
status: Optional[str] = None,
|
| 79 |
+
) -> Optional[DatabaseClient]:
|
| 80 |
+
"""Update an existing database client connection.
|
| 81 |
+
|
| 82 |
+
Only non-None fields are updated.
|
| 83 |
+
Credentials are re-encrypted if provided.
|
| 84 |
+
"""
|
| 85 |
+
client = await self.get(db, client_id)
|
| 86 |
+
if not client:
|
| 87 |
+
return None
|
| 88 |
+
|
| 89 |
+
if name is not None:
|
| 90 |
+
client.name = name
|
| 91 |
+
if credentials is not None:
|
| 92 |
+
client.credentials = encrypt_credentials_dict(credentials)
|
| 93 |
+
if status is not None:
|
| 94 |
+
client.status = status
|
| 95 |
+
|
| 96 |
+
await db.commit()
|
| 97 |
+
await db.refresh(client)
|
| 98 |
+
logger.info(f"Updated database client {client_id}")
|
| 99 |
+
return client
|
| 100 |
+
|
| 101 |
+
async def delete(
|
| 102 |
+
self,
|
| 103 |
+
db: AsyncSession,
|
| 104 |
+
client_id: str,
|
| 105 |
+
) -> bool:
|
| 106 |
+
"""Permanently delete a database client connection."""
|
| 107 |
+
result = await db.execute(
|
| 108 |
+
delete(DatabaseClient).where(DatabaseClient.id == client_id)
|
| 109 |
+
)
|
| 110 |
+
await db.commit()
|
| 111 |
+
deleted = result.rowcount > 0
|
| 112 |
+
if deleted:
|
| 113 |
+
logger.info(f"Deleted database client {client_id}")
|
| 114 |
+
return deleted
|
| 115 |
+
|
| 116 |
+
|
| 117 |
+
database_client_service = DatabaseClientService()
|
| 118 |
+
|
src/models/credentials.py
ADDED
|
@@ -0,0 +1,164 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""Pydantic credential schemas for user-registered external databases.
|
| 2 |
+
|
| 3 |
+
Imported by the `/database-clients` API router (`src/api/v1/db_client.py`) and,
|
| 4 |
+
via `DbType`, by the db pipeline connector (`src/pipeline/db_pipeline/connector.py`).
|
| 5 |
+
|
| 6 |
+
Sensitive fields (`password`, `service_account_json`) are Fernet-encrypted by
|
| 7 |
+
the database_client service before being stored in the JSONB column; these
|
| 8 |
+
schemas describe the plaintext wire format, not the stored shape.
|
| 9 |
+
"""
|
| 10 |
+
|
| 11 |
+
from typing import Literal, Optional, Union
|
| 12 |
+
|
| 13 |
+
from pydantic import BaseModel, Field
|
| 14 |
+
|
| 15 |
+
# ---------------------------------------------------------------------------
|
| 16 |
+
# Supported DB types
|
| 17 |
+
# ---------------------------------------------------------------------------
|
| 18 |
+
|
| 19 |
+
DbType = Literal["postgres", "mysql", "sqlserver", "supabase", "bigquery", "snowflake"]
|
| 20 |
+
|
| 21 |
+
|
| 22 |
+
# ---------------------------------------------------------------------------
|
| 23 |
+
# Typed credential schemas per DB type
|
| 24 |
+
# ---------------------------------------------------------------------------
|
| 25 |
+
|
| 26 |
+
|
| 27 |
+
class PostgresCredentials(BaseModel):
|
| 28 |
+
"""Connection credentials for PostgreSQL."""
|
| 29 |
+
|
| 30 |
+
host: str = Field(..., description="Hostname or IP address of the PostgreSQL server.", examples=["db.example.com"])
|
| 31 |
+
port: int = Field(5432, description="Port number (default: 5432).", examples=[5432])
|
| 32 |
+
database: str = Field(..., description="Name of the target database.", examples=["mydb"])
|
| 33 |
+
username: str = Field(..., description="Database username.", examples=["admin"])
|
| 34 |
+
password: str = Field(..., description="Database password. Will be encrypted at rest.", examples=["s3cr3t!"])
|
| 35 |
+
ssl_mode: Literal["disable", "require", "verify-ca", "verify-full"] = Field(
|
| 36 |
+
"require",
|
| 37 |
+
description="SSL mode for the connection.",
|
| 38 |
+
examples=["require"],
|
| 39 |
+
)
|
| 40 |
+
|
| 41 |
+
|
| 42 |
+
class MysqlCredentials(BaseModel):
|
| 43 |
+
"""Connection credentials for MySQL."""
|
| 44 |
+
|
| 45 |
+
host: str = Field(..., description="Hostname or IP address of the MySQL server.", examples=["db.example.com"])
|
| 46 |
+
port: int = Field(3306, description="Port number (default: 3306).", examples=[3306])
|
| 47 |
+
database: str = Field(..., description="Name of the target database.", examples=["mydb"])
|
| 48 |
+
username: str = Field(..., description="Database username.", examples=["admin"])
|
| 49 |
+
password: str = Field(..., description="Database password. Will be encrypted at rest.", examples=["s3cr3t!"])
|
| 50 |
+
ssl: bool = Field(True, description="Enable SSL for the connection.", examples=[True])
|
| 51 |
+
|
| 52 |
+
|
| 53 |
+
class SqlServerCredentials(BaseModel):
|
| 54 |
+
"""Connection credentials for Microsoft SQL Server."""
|
| 55 |
+
|
| 56 |
+
host: str = Field(..., description="Hostname or IP address of the SQL Server.", examples=["sqlserver.example.com"])
|
| 57 |
+
port: int = Field(1433, description="Port number (default: 1433).", examples=[1433])
|
| 58 |
+
database: str = Field(..., description="Name of the target database.", examples=["mydb"])
|
| 59 |
+
username: str = Field(..., description="Database username.", examples=["sa"])
|
| 60 |
+
password: str = Field(..., description="Database password. Will be encrypted at rest.", examples=["s3cr3t!"])
|
| 61 |
+
driver: Optional[str] = Field(
|
| 62 |
+
None,
|
| 63 |
+
description="ODBC driver name. Leave empty to use the default driver.",
|
| 64 |
+
examples=["ODBC Driver 17 for SQL Server"],
|
| 65 |
+
)
|
| 66 |
+
|
| 67 |
+
|
| 68 |
+
class SupabaseCredentials(BaseModel):
|
| 69 |
+
"""Connection credentials for Supabase (PostgreSQL-based).
|
| 70 |
+
|
| 71 |
+
Use the connection string details from your Supabase project dashboard
|
| 72 |
+
under Settings > Database.
|
| 73 |
+
"""
|
| 74 |
+
|
| 75 |
+
host: str = Field(
|
| 76 |
+
...,
|
| 77 |
+
description="Supabase database host (e.g. db.<project-ref>.supabase.co, or the pooler host).",
|
| 78 |
+
examples=["db.xxxx.supabase.co"],
|
| 79 |
+
)
|
| 80 |
+
port: int = Field(
|
| 81 |
+
5432,
|
| 82 |
+
description="Port number. Use 5432 for direct connection, 6543 for the connection pooler.",
|
| 83 |
+
examples=[5432],
|
| 84 |
+
)
|
| 85 |
+
database: str = Field("postgres", description="Database name (always 'postgres' for Supabase).", examples=["postgres"])
|
| 86 |
+
username: str = Field(
|
| 87 |
+
...,
|
| 88 |
+
description="Database user. Use 'postgres' for direct connection, or 'postgres.<project-ref>' for the pooler.",
|
| 89 |
+
examples=["postgres"],
|
| 90 |
+
)
|
| 91 |
+
password: str = Field(..., description="Database password (set in Supabase dashboard). Will be encrypted at rest.", examples=["s3cr3t!"])
|
| 92 |
+
ssl_mode: Literal["require", "verify-ca", "verify-full"] = Field(
|
| 93 |
+
"require",
|
| 94 |
+
description="SSL mode. Supabase always requires SSL.",
|
| 95 |
+
examples=["require"],
|
| 96 |
+
)
|
| 97 |
+
|
| 98 |
+
|
| 99 |
+
class BigQueryCredentials(BaseModel):
|
| 100 |
+
"""Connection credentials for Google BigQuery.
|
| 101 |
+
|
| 102 |
+
Requires a GCP Service Account with at least BigQuery Data Viewer
|
| 103 |
+
and BigQuery Job User roles.
|
| 104 |
+
"""
|
| 105 |
+
|
| 106 |
+
project_id: str = Field(..., description="GCP project ID where the BigQuery dataset resides.", examples=["my-gcp-project"])
|
| 107 |
+
dataset_id: str = Field(..., description="BigQuery dataset name to connect to.", examples=["my_dataset"])
|
| 108 |
+
location: Optional[str] = Field(
|
| 109 |
+
"US",
|
| 110 |
+
description="Dataset location/region (default: US).",
|
| 111 |
+
examples=["US", "EU", "asia-southeast1"],
|
| 112 |
+
)
|
| 113 |
+
service_account_json: str = Field(
|
| 114 |
+
...,
|
| 115 |
+
description=(
|
| 116 |
+
"Full content of the GCP Service Account key JSON file as a string. "
|
| 117 |
+
"Will be encrypted at rest."
|
| 118 |
+
),
|
| 119 |
+
examples=['{"type":"service_account","project_id":"my-gcp-project","private_key_id":"..."}'],
|
| 120 |
+
)
|
| 121 |
+
|
| 122 |
+
|
| 123 |
+
class SnowflakeCredentials(BaseModel):
|
| 124 |
+
"""Connection credentials for Snowflake."""
|
| 125 |
+
|
| 126 |
+
account: str = Field(
|
| 127 |
+
...,
|
| 128 |
+
description="Snowflake account identifier, including region if applicable (e.g. myaccount.us-east-1).",
|
| 129 |
+
examples=["myaccount.us-east-1"],
|
| 130 |
+
)
|
| 131 |
+
warehouse: str = Field(..., description="Name of the virtual warehouse to use for queries.", examples=["COMPUTE_WH"])
|
| 132 |
+
database: str = Field(..., description="Name of the target Snowflake database.", examples=["MY_DB"])
|
| 133 |
+
db_schema: Optional[str] = Field("PUBLIC", alias="schema", description="Schema name (default: PUBLIC).", examples=["PUBLIC"])
|
| 134 |
+
username: str = Field(..., description="Snowflake username.", examples=["admin"])
|
| 135 |
+
password: str = Field(..., description="Snowflake password. Will be encrypted at rest.", examples=["s3cr3t!"])
|
| 136 |
+
role: Optional[str] = Field(None, description="Snowflake role to assume for the session.", examples=["SYSADMIN"])
|
| 137 |
+
|
| 138 |
+
|
| 139 |
+
# Union of all credential shapes — reserved for future typed validation on
|
| 140 |
+
# DatabaseClientCreate.credentials (currently Dict[str, Any]). Kept exported
|
| 141 |
+
# so downstream code can reference it without re-declaring.
|
| 142 |
+
CredentialsUnion = Union[
|
| 143 |
+
PostgresCredentials,
|
| 144 |
+
MysqlCredentials,
|
| 145 |
+
SqlServerCredentials,
|
| 146 |
+
SupabaseCredentials,
|
| 147 |
+
BigQueryCredentials,
|
| 148 |
+
SnowflakeCredentials,
|
| 149 |
+
]
|
| 150 |
+
|
| 151 |
+
|
| 152 |
+
# Doc-only helper: surfaces per-type credential shapes in the Swagger "Schemas"
|
| 153 |
+
# panel so API consumers can discover the exact field set for each db_type.
|
| 154 |
+
# Not referenced by any endpoint — importing it in db_client.py is enough for
|
| 155 |
+
# FastAPI's OpenAPI generator to pick it up.
|
| 156 |
+
class CredentialSchemas(BaseModel):
|
| 157 |
+
"""Reference schemas for `credentials` per `db_type` (Swagger-only, not used by endpoints)."""
|
| 158 |
+
|
| 159 |
+
postgres: PostgresCredentials
|
| 160 |
+
mysql: MysqlCredentials
|
| 161 |
+
sqlserver: SqlServerCredentials
|
| 162 |
+
supabase: SupabaseCredentials
|
| 163 |
+
bigquery: BigQueryCredentials
|
| 164 |
+
snowflake: SnowflakeCredentials
|