| | """Utility helpers for the Business Intelligence dashboard.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | from dataclasses import dataclass |
| | from typing import Dict, Iterable, List, Tuple |
| |
|
| | import pandas as pd |
| |
|
| |
|
| | SUPPORTED_FILE_TYPES: Tuple[str, ...] = (".csv", ".xlsx", ".xls") |
| | """Allowed file extensions for uploads.""" |
| |
|
| | PREVIEW_ROWS: int = 5 |
| | """Default number of rows to display in dataset previews.""" |
| |
|
| |
|
| | @dataclass(frozen=True) |
| | class ColumnTypes: |
| | """Container describing inferred column groupings.""" |
| |
|
| | numeric: Tuple[str, ...] |
| | categorical: Tuple[str, ...] |
| | datetime: Tuple[str, ...] |
| |
|
| |
|
| | def is_supported_file(filename: str | None) -> bool: |
| | """Return True when the provided filename uses a supported extension.""" |
| | if not filename: |
| | return False |
| | lowered = filename.lower() |
| | return any(lowered.endswith(ext) for ext in SUPPORTED_FILE_TYPES) |
| |
|
| |
|
| | def coerce_datetime_columns(df: pd.DataFrame, threshold: float = 0.6) -> Tuple[pd.DataFrame, Tuple[str, ...]]: |
| | """Attempt to parse object columns as datetimes when enough values can be converted. |
| | |
| | Parameters |
| | ---------- |
| | df: |
| | Input DataFrame to mutate in-place. |
| | threshold: |
| | Minimum fraction of non-null values that must successfully convert |
| | for the column to be promoted to datetime. |
| | |
| | Returns |
| | ------- |
| | tuple |
| | Mutated DataFrame and the tuple of datetime column names. |
| | """ |
| | datetime_cols: List[str] = list( |
| | df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns |
| | ) |
| |
|
| | object_cols = df.select_dtypes(include=["object"]).columns |
| | for col in object_cols: |
| | series = df[col] |
| | non_null_ratio = series.notna().mean() |
| | if non_null_ratio == 0 or non_null_ratio < threshold: |
| | continue |
| | converted = pd.to_datetime(series, errors="coerce", utc=False) |
| | success_ratio = converted.notna().mean() |
| | if success_ratio >= threshold: |
| | df[col] = converted |
| | datetime_cols.append(col) |
| |
|
| | return df, tuple(sorted(set(datetime_cols))) |
| |
|
| |
|
| | def infer_column_types(df: pd.DataFrame) -> ColumnTypes: |
| | """Infer high-level data types for the provided DataFrame's columns.""" |
| | numeric_cols = tuple(df.select_dtypes(include=["number"]).columns) |
| | datetime_cols = tuple(df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns) |
| | categorical_cols: List[str] = [] |
| |
|
| | for col in df.columns: |
| | if col in numeric_cols or col in datetime_cols: |
| | continue |
| | categorical_cols.append(col) |
| |
|
| | return ColumnTypes(numeric=numeric_cols, categorical=tuple(categorical_cols), datetime=datetime_cols) |
| |
|
| |
|
| | def clamp_numeric(value: float, minimum: float, maximum: float) -> float: |
| | """Clamp *value* into the closed range [minimum, maximum].""" |
| | return max(minimum, min(maximum, value)) |
| |
|
| |
|
| | def ensure_unique_columns(df: pd.DataFrame) -> pd.DataFrame: |
| | """Rename duplicate columns to maintain uniqueness.""" |
| | if df.columns.is_unique: |
| | return df |
| |
|
| | new_columns: List[str] = [] |
| | seen: Dict[str, int] = {} |
| | for col in df.columns: |
| | count = seen.get(col, 0) |
| | if count == 0: |
| | new_columns.append(col) |
| | else: |
| | new_columns.append(f"{col}_{count}") |
| | seen[col] = count + 1 |
| |
|
| | df = df.copy() |
| | df.columns = new_columns |
| | return df |
| |
|
| |
|
| | def shorten_text(value: str, max_length: int = 80) -> str: |
| | """Truncate long text values for cleaner display.""" |
| | if len(value) <= max_length: |
| | return value |
| | return f"{value[: max_length - 3]}..." |
| |
|
| |
|
| | def safe_column_subset(columns: Iterable[str], allowed: Iterable[str]) -> List[str]: |
| | """Return a list of *columns* that exist inside *allowed*.""" |
| | allowed_set = set(allowed) |
| | return [col for col in columns if col in allowed_set] |
| |
|