Rifqi Hafizuddin
[NOTICKET][DB] pisahin db credential ke folder model. add ingestion endpoint at db_client to use db pipeline. add router db_client di main.
347a73a
raw
history blame
7.38 kB
"""Pydantic credential schemas for user-registered external databases.
Imported by the `/database-clients` API router (`src/api/v1/db_client.py`) and,
via `DbType`, by the db pipeline connector (`src/pipeline/db_pipeline/connector.py`).
Sensitive fields (`password`, `service_account_json`) are Fernet-encrypted by
the database_client service before being stored in the JSONB column; these
schemas describe the plaintext wire format, not the stored shape.
"""
from typing import Literal, Optional, Union
from pydantic import BaseModel, Field
# ---------------------------------------------------------------------------
# Supported DB types
# ---------------------------------------------------------------------------
DbType = Literal["postgres", "mysql", "sqlserver", "supabase", "bigquery", "snowflake"]
# ---------------------------------------------------------------------------
# Typed credential schemas per DB type
# ---------------------------------------------------------------------------
class PostgresCredentials(BaseModel):
"""Connection credentials for PostgreSQL."""
host: str = Field(..., description="Hostname or IP address of the PostgreSQL server.", examples=["db.example.com"])
port: int = Field(5432, description="Port number (default: 5432).", examples=[5432])
database: str = Field(..., description="Name of the target database.", examples=["mydb"])
username: str = Field(..., description="Database username.", examples=["admin"])
password: str = Field(..., description="Database password. Will be encrypted at rest.", examples=["s3cr3t!"])
ssl_mode: Literal["disable", "require", "verify-ca", "verify-full"] = Field(
"require",
description="SSL mode for the connection.",
examples=["require"],
)
class MysqlCredentials(BaseModel):
"""Connection credentials for MySQL."""
host: str = Field(..., description="Hostname or IP address of the MySQL server.", examples=["db.example.com"])
port: int = Field(3306, description="Port number (default: 3306).", examples=[3306])
database: str = Field(..., description="Name of the target database.", examples=["mydb"])
username: str = Field(..., description="Database username.", examples=["admin"])
password: str = Field(..., description="Database password. Will be encrypted at rest.", examples=["s3cr3t!"])
ssl: bool = Field(True, description="Enable SSL for the connection.", examples=[True])
class SqlServerCredentials(BaseModel):
"""Connection credentials for Microsoft SQL Server."""
host: str = Field(..., description="Hostname or IP address of the SQL Server.", examples=["sqlserver.example.com"])
port: int = Field(1433, description="Port number (default: 1433).", examples=[1433])
database: str = Field(..., description="Name of the target database.", examples=["mydb"])
username: str = Field(..., description="Database username.", examples=["sa"])
password: str = Field(..., description="Database password. Will be encrypted at rest.", examples=["s3cr3t!"])
driver: Optional[str] = Field(
None,
description="ODBC driver name. Leave empty to use the default driver.",
examples=["ODBC Driver 17 for SQL Server"],
)
class SupabaseCredentials(BaseModel):
"""Connection credentials for Supabase (PostgreSQL-based).
Use the connection string details from your Supabase project dashboard
under Settings > Database.
"""
host: str = Field(
...,
description="Supabase database host (e.g. db.<project-ref>.supabase.co, or the pooler host).",
examples=["db.xxxx.supabase.co"],
)
port: int = Field(
5432,
description="Port number. Use 5432 for direct connection, 6543 for the connection pooler.",
examples=[5432],
)
database: str = Field("postgres", description="Database name (always 'postgres' for Supabase).", examples=["postgres"])
username: str = Field(
...,
description="Database user. Use 'postgres' for direct connection, or 'postgres.<project-ref>' for the pooler.",
examples=["postgres"],
)
password: str = Field(..., description="Database password (set in Supabase dashboard). Will be encrypted at rest.", examples=["s3cr3t!"])
ssl_mode: Literal["require", "verify-ca", "verify-full"] = Field(
"require",
description="SSL mode. Supabase always requires SSL.",
examples=["require"],
)
class BigQueryCredentials(BaseModel):
"""Connection credentials for Google BigQuery.
Requires a GCP Service Account with at least BigQuery Data Viewer
and BigQuery Job User roles.
"""
project_id: str = Field(..., description="GCP project ID where the BigQuery dataset resides.", examples=["my-gcp-project"])
dataset_id: str = Field(..., description="BigQuery dataset name to connect to.", examples=["my_dataset"])
location: Optional[str] = Field(
"US",
description="Dataset location/region (default: US).",
examples=["US", "EU", "asia-southeast1"],
)
service_account_json: str = Field(
...,
description=(
"Full content of the GCP Service Account key JSON file as a string. "
"Will be encrypted at rest."
),
examples=['{"type":"service_account","project_id":"my-gcp-project","private_key_id":"..."}'],
)
class SnowflakeCredentials(BaseModel):
"""Connection credentials for Snowflake."""
account: str = Field(
...,
description="Snowflake account identifier, including region if applicable (e.g. myaccount.us-east-1).",
examples=["myaccount.us-east-1"],
)
warehouse: str = Field(..., description="Name of the virtual warehouse to use for queries.", examples=["COMPUTE_WH"])
database: str = Field(..., description="Name of the target Snowflake database.", examples=["MY_DB"])
db_schema: Optional[str] = Field("PUBLIC", alias="schema", description="Schema name (default: PUBLIC).", examples=["PUBLIC"])
username: str = Field(..., description="Snowflake username.", examples=["admin"])
password: str = Field(..., description="Snowflake password. Will be encrypted at rest.", examples=["s3cr3t!"])
role: Optional[str] = Field(None, description="Snowflake role to assume for the session.", examples=["SYSADMIN"])
# Union of all credential shapes — reserved for future typed validation on
# DatabaseClientCreate.credentials (currently Dict[str, Any]). Kept exported
# so downstream code can reference it without re-declaring.
CredentialsUnion = Union[
PostgresCredentials,
MysqlCredentials,
SqlServerCredentials,
SupabaseCredentials,
BigQueryCredentials,
SnowflakeCredentials,
]
# Doc-only helper: surfaces per-type credential shapes in the Swagger "Schemas"
# panel so API consumers can discover the exact field set for each db_type.
# Not referenced by any endpoint — importing it in db_client.py is enough for
# FastAPI's OpenAPI generator to pick it up.
class CredentialSchemas(BaseModel):
"""Reference schemas for `credentials` per `db_type` (Swagger-only, not used by endpoints)."""
postgres: PostgresCredentials
mysql: MysqlCredentials
sqlserver: SqlServerCredentials
supabase: SupabaseCredentials
bigquery: BigQueryCredentials
snowflake: SnowflakeCredentials