| """ |
| Parquet module. |
| |
| TODO: handle migrations |
| TODO: make it work with chunked exports. |
| TODO: make it work with chunked imports. |
| |
| Mostly auto-generated by Cursor + GPT-5. |
| """ |
|
|
| import os |
|
|
| import pandas as pd |
| from sqlalchemy import inspect, text |
| from sqlalchemy.engine import Engine |
| from sqlmodel import Session |
|
|
|
|
| def export_to_parquet(engine: Engine, backup_dir: str) -> None: |
| """ |
| Export each table in the database to a separate Parquet file. |
| Loads entire tables into memory and sorts deterministically. |
| |
| TODO: make it work with chunked exports. |
| TODO: handle migrations |
| """ |
| os.makedirs(backup_dir, exist_ok=True) |
| inspector = inspect(engine) |
| table_names = inspector.get_table_names() |
|
|
| for table_name in table_names: |
| file_path = os.path.join(backup_dir, f"{table_name}.parquet") |
|
|
| |
| query = text(f"SELECT * FROM {table_name}") |
| with engine.connect() as conn: |
| df = pd.read_sql_query(query, conn) |
|
|
| |
| sort_cols = list(df.columns) |
| df_sorted = df.sort_values(by=sort_cols).reset_index(drop=True) |
|
|
| |
| df_sorted.to_parquet(file_path, index=False) |
| print(f"Exported {table_name} to {file_path}") |
|
|
|
|
| def import_from_parquet(engine: Engine, backup_dir: str) -> None: |
| """ |
| Import each Parquet file into the database. |
| Checks schema strictly (column names + types). |
| Loads entire files into memory. |
| |
| TODO: make it work with chunked imports. |
| TODO: handle migrations |
| """ |
| inspector = inspect(engine) |
| table_names = inspector.get_table_names() |
|
|
| for table_name in table_names: |
| file_path = os.path.join(backup_dir, f"{table_name}.parquet") |
| if not os.path.exists(file_path): |
| print(f"No backup found for table {table_name}, skipping.") |
| continue |
|
|
| |
| with Session(engine) as session: |
| session.exec(text(f"DELETE FROM {table_name}")) |
|
|
| |
| df = pd.read_parquet(file_path) |
| with engine.begin() as conn: |
| conn.execute(text(f"DELETE FROM {table_name}")) |
| if not df.empty: |
| columns = df.columns.tolist() |
| total_rows = len(df) |
| chunk_size = 10000 |
| for start in range(0, total_rows, chunk_size): |
| end = min(start + chunk_size, total_rows) |
| chunk = df.iloc[start:end] |
| values = chunk.to_dict(orient="records") |
| insert_stmt = text( |
| f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES " |
| + ", ".join( |
| [ |
| "(" |
| + ", ".join([f":{col}_{i}" for col in columns]) |
| + ")" |
| for i in range(len(values)) |
| ] |
| ) |
| ) |
| params = {} |
| for i, row in enumerate(values): |
| for col in columns: |
| params[f"{col}_{i}"] = row[col] |
| conn.execute(insert_stmt, params) |
|
|
| print(f"Imported {table_name} from {file_path}") |
|
|