| | """ |
| | Nó para operações de banco de dados |
| | """ |
| | import os |
| | import logging |
| | import pandas as pd |
| | from typing import Dict, Any, TypedDict, Optional |
| | from sqlalchemy import create_engine |
| |
|
| | from utils.config import SQL_DB_PATH |
| | from utils.database import create_sql_database, validate_database |
| | from utils.object_manager import get_object_manager |
| |
|
| | class DatabaseState(TypedDict): |
| | """Estado para operações de banco de dados""" |
| | success: bool |
| | message: str |
| | database_info: dict |
| | engine_id: str |
| | db_id: str |
| |
|
| | async def create_database_from_dataframe_node(state: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Nó para criar banco de dados a partir de DataFrame processado |
| | |
| | Args: |
| | state: Estado contendo informações do DataFrame processado |
| | |
| | Returns: |
| | Estado atualizado com informações do banco |
| | """ |
| | try: |
| | obj_manager = get_object_manager() |
| | |
| | |
| | df_id = state.get("dataframe_id") |
| | if not df_id: |
| | raise ValueError("ID do DataFrame não encontrado no estado") |
| | |
| | processed_df = obj_manager.get_object(df_id) |
| | if processed_df is None: |
| | raise ValueError("DataFrame processado não encontrado") |
| | |
| | |
| | column_info = state.get("column_info", {}) |
| | sql_types = column_info.get("sql_types", {}) |
| | |
| | |
| | engine = create_engine(f"sqlite:///{SQL_DB_PATH}") |
| | |
| | |
| | processed_df.to_sql( |
| | "tabela", |
| | engine, |
| | index=False, |
| | if_exists="replace", |
| | dtype=sql_types |
| | ) |
| | |
| | logging.info(f"[DATABASE] Banco criado com {len(processed_df)} registros") |
| | |
| | |
| | db = create_sql_database(engine) |
| | |
| | |
| | is_valid = validate_database(engine) |
| | |
| | |
| | engine_id = obj_manager.store_engine(engine) |
| | db_id = obj_manager.store_database(db) |
| | |
| | |
| | database_info = { |
| | "path": SQL_DB_PATH, |
| | "table_name": "tabela", |
| | "total_records": len(processed_df), |
| | "columns": list(processed_df.columns), |
| | "column_types": {col: str(dtype) for col, dtype in processed_df.dtypes.items()}, |
| | "is_valid": is_valid, |
| | "sql_types_used": {col: str(sql_type) for col, sql_type in sql_types.items()} |
| | } |
| | |
| | |
| | state.update({ |
| | "success": True, |
| | "message": f"✅ Banco de dados criado com sucesso! {len(processed_df)} registros salvos", |
| | "database_info": database_info, |
| | "engine_id": engine_id, |
| | "db_id": db_id |
| | }) |
| | |
| | logging.info(f"[DATABASE] Banco criado e validado: {database_info}") |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro ao criar banco de dados: {e}" |
| | logging.error(f"[DATABASE] {error_msg}") |
| | state.update({ |
| | "success": False, |
| | "message": error_msg, |
| | "database_info": {}, |
| | "engine_id": "", |
| | "db_id": "" |
| | }) |
| | |
| | return state |
| |
|
| | async def load_existing_database_node(state: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Nó para carregar banco de dados existente |
| | |
| | Args: |
| | state: Estado atual |
| | |
| | Returns: |
| | Estado atualizado com informações do banco existente |
| | """ |
| | try: |
| | if not os.path.exists(SQL_DB_PATH): |
| | raise ValueError("Banco de dados não encontrado") |
| | |
| | |
| | engine = create_engine(f"sqlite:///{SQL_DB_PATH}") |
| | |
| | |
| | db = create_sql_database(engine) |
| | |
| | |
| | is_valid = validate_database(engine) |
| | |
| | |
| | try: |
| | sample_df = pd.read_sql_query("SELECT * FROM tabela LIMIT 5", engine) |
| | total_records_df = pd.read_sql_query("SELECT COUNT(*) as count FROM tabela", engine) |
| | total_records = total_records_df.iloc[0]['count'] |
| | |
| | database_info = { |
| | "path": SQL_DB_PATH, |
| | "table_name": "tabela", |
| | "total_records": total_records, |
| | "columns": list(sample_df.columns), |
| | "column_types": {col: str(dtype) for col, dtype in sample_df.dtypes.items()}, |
| | "is_valid": is_valid, |
| | "sample_data": sample_df.head(3).to_dict() |
| | } |
| | except Exception as e: |
| | logging.warning(f"Erro ao obter informações detalhadas do banco: {e}") |
| | database_info = { |
| | "path": SQL_DB_PATH, |
| | "table_name": "tabela", |
| | "is_valid": is_valid, |
| | "error": str(e) |
| | } |
| | |
| | |
| | obj_manager = get_object_manager() |
| | engine_id = obj_manager.store_engine(engine) |
| | db_id = obj_manager.store_database(db) |
| | |
| | |
| | state.update({ |
| | "success": True, |
| | "message": "✅ Banco de dados existente carregado com sucesso", |
| | "database_info": database_info, |
| | "engine_id": engine_id, |
| | "db_id": db_id |
| | }) |
| | |
| | logging.info(f"[DATABASE] Banco existente carregado: {database_info}") |
| | |
| | except Exception as e: |
| | error_msg = f"❌ Erro ao carregar banco existente: {e}" |
| | logging.error(f"[DATABASE] {error_msg}") |
| | state.update({ |
| | "success": False, |
| | "message": error_msg, |
| | "database_info": {}, |
| | "engine_id": "", |
| | "db_id": "" |
| | }) |
| | |
| | return state |
| |
|
| | async def get_database_sample_node(state: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Nó para obter amostra dos dados do banco |
| | |
| | Args: |
| | state: Estado contendo ID da engine |
| | |
| | Returns: |
| | Estado atualizado com amostra dos dados |
| | """ |
| | try: |
| | obj_manager = get_object_manager() |
| | |
| | |
| | engine_id = state.get("engine_id") |
| | if not engine_id: |
| | raise ValueError("ID da engine não encontrado") |
| | |
| | engine = obj_manager.get_engine(engine_id) |
| | if not engine: |
| | raise ValueError("Engine não encontrada") |
| | |
| | |
| | connection_type = state.get("connection_type", "csv") |
| |
|
| | if connection_type == "postgresql": |
| | |
| | import sqlalchemy as sa |
| |
|
| | try: |
| | with engine.connect() as conn: |
| | |
| | tables_result = conn.execute(sa.text(""" |
| | SELECT table_name |
| | FROM information_schema.tables |
| | WHERE table_schema = 'public' |
| | ORDER BY table_name |
| | """)) |
| | available_tables = [row[0] for row in tables_result.fetchall()] |
| |
|
| | if not available_tables: |
| | raise ValueError("Nenhuma tabela encontrada no banco PostgreSQL") |
| |
|
| | |
| | table_name = None |
| | for table in available_tables: |
| | try: |
| | |
| | count_result = conn.execute(sa.text(f"SELECT COUNT(*) FROM {table} LIMIT 1")) |
| | count = count_result.scalar() |
| | if count > 0: |
| | table_name = table |
| | logging.info(f"[DATABASE] PostgreSQL - usando tabela '{table_name}' para amostra ({count} registros)") |
| | break |
| | except Exception as e: |
| | logging.warning(f"[DATABASE] Erro ao verificar tabela {table}: {e}") |
| | continue |
| |
|
| | |
| | if not table_name: |
| | table_name = available_tables[0] |
| | logging.info(f"[DATABASE] PostgreSQL - usando primeira tabela '{table_name}' (sem dados detectados)") |
| |
|
| | except Exception as e: |
| | logging.error(f"[DATABASE] Erro ao detectar tabelas PostgreSQL: {e}") |
| | raise ValueError(f"Erro ao acessar tabelas PostgreSQL: {e}") |
| |
|
| | else: |
| | table_name = "tabela" |
| | logging.info(f"[DATABASE] CSV - usando tabela padrão: {table_name}") |
| |
|
| | |
| | try: |
| | sample_df = pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT 10", engine) |
| | logging.info(f"[DATABASE] Amostra obtida da tabela '{table_name}': {sample_df.shape[0]} registros") |
| | except Exception as e: |
| | logging.error(f"[DATABASE] Erro ao obter amostra da tabela '{table_name}': {e}") |
| | |
| | sample_df = pd.DataFrame() |
| | |
| | |
| | db_sample_dict = { |
| | "data": sample_df.to_dict('records'), |
| | "columns": list(sample_df.columns), |
| | "dtypes": sample_df.dtypes.astype(str).to_dict(), |
| | "shape": sample_df.shape |
| | } |
| | |
| | state["db_sample_dict"] = db_sample_dict |
| | |
| | logging.info(f"[DATABASE] Amostra obtida: {sample_df.shape[0]} registros") |
| | |
| | except Exception as e: |
| | error_msg = f"Erro ao obter amostra do banco: {e}" |
| | logging.error(f"[DATABASE] {error_msg}") |
| | state["db_sample_dict"] = {} |
| | state["error"] = error_msg |
| | |
| | return state |
| |
|