Rifqi Hafizuddin
[NOTICKET][DB] pisahin db credential ke folder model. add ingestion endpoint at db_client to use db pipeline. add router db_client di main.
347a73a | """Pydantic credential schemas for user-registered external databases. | |
| Imported by the `/database-clients` API router (`src/api/v1/db_client.py`) and, | |
| via `DbType`, by the db pipeline connector (`src/pipeline/db_pipeline/connector.py`). | |
| Sensitive fields (`password`, `service_account_json`) are Fernet-encrypted by | |
| the database_client service before being stored in the JSONB column; these | |
| schemas describe the plaintext wire format, not the stored shape. | |
| """ | |
| from typing import Literal, Optional, Union | |
| from pydantic import BaseModel, Field | |
| # --------------------------------------------------------------------------- | |
| # Supported DB types | |
| # --------------------------------------------------------------------------- | |
| DbType = Literal["postgres", "mysql", "sqlserver", "supabase", "bigquery", "snowflake"] | |
| # --------------------------------------------------------------------------- | |
| # Typed credential schemas per DB type | |
| # --------------------------------------------------------------------------- | |
| class PostgresCredentials(BaseModel): | |
| """Connection credentials for PostgreSQL.""" | |
| host: str = Field(..., description="Hostname or IP address of the PostgreSQL server.", examples=["db.example.com"]) | |
| port: int = Field(5432, description="Port number (default: 5432).", examples=[5432]) | |
| database: str = Field(..., description="Name of the target database.", examples=["mydb"]) | |
| username: str = Field(..., description="Database username.", examples=["admin"]) | |
| password: str = Field(..., description="Database password. Will be encrypted at rest.", examples=["s3cr3t!"]) | |
| ssl_mode: Literal["disable", "require", "verify-ca", "verify-full"] = Field( | |
| "require", | |
| description="SSL mode for the connection.", | |
| examples=["require"], | |
| ) | |
| class MysqlCredentials(BaseModel): | |
| """Connection credentials for MySQL.""" | |
| host: str = Field(..., description="Hostname or IP address of the MySQL server.", examples=["db.example.com"]) | |
| port: int = Field(3306, description="Port number (default: 3306).", examples=[3306]) | |
| database: str = Field(..., description="Name of the target database.", examples=["mydb"]) | |
| username: str = Field(..., description="Database username.", examples=["admin"]) | |
| password: str = Field(..., description="Database password. Will be encrypted at rest.", examples=["s3cr3t!"]) | |
| ssl: bool = Field(True, description="Enable SSL for the connection.", examples=[True]) | |
| class SqlServerCredentials(BaseModel): | |
| """Connection credentials for Microsoft SQL Server.""" | |
| host: str = Field(..., description="Hostname or IP address of the SQL Server.", examples=["sqlserver.example.com"]) | |
| port: int = Field(1433, description="Port number (default: 1433).", examples=[1433]) | |
| database: str = Field(..., description="Name of the target database.", examples=["mydb"]) | |
| username: str = Field(..., description="Database username.", examples=["sa"]) | |
| password: str = Field(..., description="Database password. Will be encrypted at rest.", examples=["s3cr3t!"]) | |
| driver: Optional[str] = Field( | |
| None, | |
| description="ODBC driver name. Leave empty to use the default driver.", | |
| examples=["ODBC Driver 17 for SQL Server"], | |
| ) | |
| class SupabaseCredentials(BaseModel): | |
| """Connection credentials for Supabase (PostgreSQL-based). | |
| Use the connection string details from your Supabase project dashboard | |
| under Settings > Database. | |
| """ | |
| host: str = Field( | |
| ..., | |
| description="Supabase database host (e.g. db.<project-ref>.supabase.co, or the pooler host).", | |
| examples=["db.xxxx.supabase.co"], | |
| ) | |
| port: int = Field( | |
| 5432, | |
| description="Port number. Use 5432 for direct connection, 6543 for the connection pooler.", | |
| examples=[5432], | |
| ) | |
| database: str = Field("postgres", description="Database name (always 'postgres' for Supabase).", examples=["postgres"]) | |
| username: str = Field( | |
| ..., | |
| description="Database user. Use 'postgres' for direct connection, or 'postgres.<project-ref>' for the pooler.", | |
| examples=["postgres"], | |
| ) | |
| password: str = Field(..., description="Database password (set in Supabase dashboard). Will be encrypted at rest.", examples=["s3cr3t!"]) | |
| ssl_mode: Literal["require", "verify-ca", "verify-full"] = Field( | |
| "require", | |
| description="SSL mode. Supabase always requires SSL.", | |
| examples=["require"], | |
| ) | |
| class BigQueryCredentials(BaseModel): | |
| """Connection credentials for Google BigQuery. | |
| Requires a GCP Service Account with at least BigQuery Data Viewer | |
| and BigQuery Job User roles. | |
| """ | |
| project_id: str = Field(..., description="GCP project ID where the BigQuery dataset resides.", examples=["my-gcp-project"]) | |
| dataset_id: str = Field(..., description="BigQuery dataset name to connect to.", examples=["my_dataset"]) | |
| location: Optional[str] = Field( | |
| "US", | |
| description="Dataset location/region (default: US).", | |
| examples=["US", "EU", "asia-southeast1"], | |
| ) | |
| service_account_json: str = Field( | |
| ..., | |
| description=( | |
| "Full content of the GCP Service Account key JSON file as a string. " | |
| "Will be encrypted at rest." | |
| ), | |
| examples=['{"type":"service_account","project_id":"my-gcp-project","private_key_id":"..."}'], | |
| ) | |
| class SnowflakeCredentials(BaseModel): | |
| """Connection credentials for Snowflake.""" | |
| account: str = Field( | |
| ..., | |
| description="Snowflake account identifier, including region if applicable (e.g. myaccount.us-east-1).", | |
| examples=["myaccount.us-east-1"], | |
| ) | |
| warehouse: str = Field(..., description="Name of the virtual warehouse to use for queries.", examples=["COMPUTE_WH"]) | |
| database: str = Field(..., description="Name of the target Snowflake database.", examples=["MY_DB"]) | |
| db_schema: Optional[str] = Field("PUBLIC", alias="schema", description="Schema name (default: PUBLIC).", examples=["PUBLIC"]) | |
| username: str = Field(..., description="Snowflake username.", examples=["admin"]) | |
| password: str = Field(..., description="Snowflake password. Will be encrypted at rest.", examples=["s3cr3t!"]) | |
| role: Optional[str] = Field(None, description="Snowflake role to assume for the session.", examples=["SYSADMIN"]) | |
| # Union of all credential shapes — reserved for future typed validation on | |
| # DatabaseClientCreate.credentials (currently Dict[str, Any]). Kept exported | |
| # so downstream code can reference it without re-declaring. | |
| CredentialsUnion = Union[ | |
| PostgresCredentials, | |
| MysqlCredentials, | |
| SqlServerCredentials, | |
| SupabaseCredentials, | |
| BigQueryCredentials, | |
| SnowflakeCredentials, | |
| ] | |
| # Doc-only helper: surfaces per-type credential shapes in the Swagger "Schemas" | |
| # panel so API consumers can discover the exact field set for each db_type. | |
| # Not referenced by any endpoint — importing it in db_client.py is enough for | |
| # FastAPI's OpenAPI generator to pick it up. | |
| class CredentialSchemas(BaseModel): | |
| """Reference schemas for `credentials` per `db_type` (Swagger-only, not used by endpoints).""" | |
| postgres: PostgresCredentials | |
| mysql: MysqlCredentials | |
| sqlserver: SqlServerCredentials | |
| supabase: SupabaseCredentials | |
| bigquery: BigQueryCredentials | |
| snowflake: SnowflakeCredentials | |