| """ |
| CSV ingest and auto-clean pipeline for time-series data. |
| |
| Provides delimiter detection, date/numeric column suggestion, |
| numeric cleaning (currency, commas, percentages, parenthesised negatives), |
| duplicate and missing-value handling, frequency detection, and |
| calendar-feature extraction. |
| """ |
|
|
| import csv |
| import io |
| import re |
| import warnings |
| from dataclasses import dataclass, field |
| from datetime import timedelta |
|
|
| import numpy as np |
| import pandas as pd |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class CleaningReport: |
| """Summary produced by :func:`clean_dataframe`.""" |
|
|
| rows_before: int = 0 |
| rows_after: int = 0 |
| duplicates_found: int = 0 |
| duplicates_action: str = "" |
| missing_before: dict = field(default_factory=dict) |
| missing_after: dict = field(default_factory=dict) |
| parsing_warnings: list = field(default_factory=list) |
|
|
|
|
| @dataclass |
| class FrequencyInfo: |
| """Result of :func:`detect_frequency`.""" |
|
|
| label: str = "Unknown" |
| median_delta: timedelta = timedelta(0) |
| is_regular: bool = False |
|
|
|
|
| |
| |
| |
|
|
| def detect_delimiter(file_bytes: bytes) -> str: |
| """Return the most likely CSV delimiter for *file_bytes*. |
| |
| Uses :class:`csv.Sniffer` on the first 8 KB of text. Falls back to a |
| comma if the sniffer cannot decide. |
| """ |
| try: |
| sample = file_bytes[:8192].decode("utf-8", errors="replace") |
| dialect = csv.Sniffer().sniff(sample) |
| return dialect.delimiter |
| except csv.Error: |
| return "," |
|
|
|
|
| |
| |
| |
|
|
| def read_csv_upload(uploaded_file) -> tuple[pd.DataFrame, str]: |
| """Read a Streamlit ``UploadedFile`` and return ``(df, delimiter)``. |
| |
| The file position is rewound so the object can be re-read later if |
| needed. |
| """ |
| raw = uploaded_file.getvalue() |
| delimiter = detect_delimiter(raw) |
| text = raw.decode("utf-8", errors="replace") |
| df = pd.read_csv(io.StringIO(text), sep=delimiter) |
| |
| uploaded_file.seek(0) |
| return df, delimiter |
|
|
|
|
| |
| |
| |
|
|
| _DATE_NAME_TOKENS = re.compile(r"(date|time|year|month|day|period)", re.IGNORECASE) |
|
|
|
|
| def suggest_date_columns(df: pd.DataFrame) -> list[str]: |
| """Return column names that are likely to contain date/time values. |
| |
| Checks are applied in order: |
| |
| 1. Column already has a datetime dtype. |
| 2. :func:`pd.to_datetime` succeeds on the first non-null values. |
| 3. The column *name* contains a date-related keyword. |
| """ |
| candidates: list[str] = [] |
|
|
| for col in df.columns: |
| name_has_token = bool(_DATE_NAME_TOKENS.search(str(col))) |
|
|
| |
| if pd.api.types.is_datetime64_any_dtype(df[col]): |
| if col not in candidates: |
| candidates.append(col) |
| continue |
|
|
| |
| sample = df[col].dropna().head(20) |
| if not sample.empty and (name_has_token or not pd.api.types.is_numeric_dtype(df[col])): |
| try: |
| with warnings.catch_warnings(): |
| warnings.simplefilter("ignore", UserWarning) |
| parsed = pd.to_datetime(sample, errors="coerce") |
| if parsed.notna().mean() >= 0.8: |
| if col not in candidates: |
| candidates.append(col) |
| continue |
| except (ValueError, TypeError, OverflowError): |
| pass |
|
|
| |
| if name_has_token: |
| if col not in candidates: |
| candidates.append(col) |
|
|
| return candidates |
|
|
|
|
| def suggest_numeric_columns(df: pd.DataFrame) -> list[str]: |
| """Return columns that are numeric or could be cleaned to numeric. |
| |
| A non-numeric column qualifies if, after stripping common formatting |
| characters (currency symbols, commas, ``%``, parentheses), at least half |
| of its non-null values can be converted to a number. |
| """ |
| candidates: list[str] = [] |
|
|
| for col in df.columns: |
| if pd.api.types.is_numeric_dtype(df[col]): |
| candidates.append(col) |
| continue |
|
|
| |
| sample = df[col].dropna().head(50).astype(str) |
| if sample.empty: |
| continue |
|
|
| cleaned = ( |
| sample |
| .str.replace(r"[\$\u20ac\u00a3,% ]", "", regex=True) |
| .str.replace(r"^\((.+)\)$", r"-\1", regex=True) |
| ) |
| numeric = pd.to_numeric(cleaned, errors="coerce") |
| if numeric.notna().sum() >= max(1, len(sample) * 0.5): |
| candidates.append(col) |
|
|
| return candidates |
|
|
|
|
| |
| |
| |
|
|
| def clean_numeric_series(series: pd.Series) -> pd.Series: |
| """Clean a series into proper numeric values. |
| |
| Handles: |
| * Currency symbols: ``$``, ``EUR`` (U+20AC), ``GBP`` (U+00A3) |
| * Thousands separators (commas) |
| * Percentage signs |
| * Parenthesised negatives, e.g. ``(123)`` becomes ``-123`` |
| """ |
| s = series.astype(str) |
|
|
| |
| s = s.str.replace(r"[\$\u20ac\u00a3,%\s]", "", regex=True) |
|
|
| |
| s = s.str.replace(r"^\((.+)\)$", r"-\1", regex=True) |
|
|
| return pd.to_numeric(s, errors="coerce") |
|
|
|
|
| |
| |
| |
|
|
| def clean_dataframe( |
| df: pd.DataFrame, |
| date_col: str, |
| y_cols: list[str], |
| dup_action: str = "keep_last", |
| missing_action: str = "interpolate", |
| ) -> tuple[pd.DataFrame, CleaningReport]: |
| """Run the full cleaning pipeline and return ``(cleaned_df, report)``. |
| |
| Parameters |
| ---------- |
| df: |
| Input dataframe (will not be mutated). |
| date_col: |
| Name of the column to parse as dates. |
| y_cols: |
| Names of the value columns to clean to numeric. |
| dup_action: |
| How to handle duplicate dates: ``"keep_first"``, ``"keep_last"``, |
| or ``"drop_all"``. |
| missing_action: |
| How to handle missing values in *y_cols*: ``"interpolate"``, |
| ``"ffill"``, or ``"drop"``. |
| """ |
| df = df.copy() |
| report = CleaningReport() |
| report.rows_before = len(df) |
|
|
| |
| try: |
| df[date_col] = pd.to_datetime(df[date_col]) |
| except Exception as exc: |
| report.parsing_warnings.append( |
| f"Date parsing issue in column '{date_col}': {exc}" |
| ) |
| |
| df[date_col] = pd.to_datetime(df[date_col], errors="coerce") |
|
|
| nat_count = int(df[date_col].isna().sum()) |
| if nat_count > 0: |
| report.parsing_warnings.append( |
| f"{nat_count} value(s) in '{date_col}' could not be parsed as dates." |
| ) |
| df = df.dropna(subset=[date_col]) |
|
|
| |
| for col in y_cols: |
| if not pd.api.types.is_numeric_dtype(df[col]): |
| df[col] = clean_numeric_series(df[col]) |
|
|
| |
| report.missing_before = { |
| col: int(df[col].isna().sum()) for col in y_cols |
| } |
|
|
| |
| dup_mask = df.duplicated(subset=[date_col], keep=False) |
| report.duplicates_found = int(dup_mask.sum()) |
| report.duplicates_action = dup_action |
|
|
| if report.duplicates_found > 0: |
| if dup_action == "keep_first": |
| df = df.drop_duplicates(subset=[date_col], keep="first") |
| elif dup_action == "keep_last": |
| df = df.drop_duplicates(subset=[date_col], keep="last") |
| elif dup_action == "drop_all": |
| df = df[~dup_mask] |
|
|
| |
| df = df.sort_values(date_col).reset_index(drop=True) |
|
|
| |
| if missing_action == "interpolate": |
| df[y_cols] = df[y_cols].interpolate(method="linear", limit_direction="both") |
| elif missing_action == "ffill": |
| df[y_cols] = df[y_cols].ffill().bfill() |
| elif missing_action == "drop": |
| df = df.dropna(subset=y_cols) |
|
|
| report.missing_after = { |
| col: int(df[col].isna().sum()) for col in y_cols |
| } |
| report.rows_after = len(df) |
|
|
| return df, report |
|
|
|
|
| |
| |
| |
|
|
| def detect_frequency(df: pd.DataFrame, date_col: str) -> FrequencyInfo: |
| """Classify the time-series frequency based on median time delta. |
| |
| Returns a :class:`FrequencyInfo` with a human-readable label, the |
| computed median delta, and whether the series is *regular* (the |
| standard deviation of deltas is less than 20 % of the median). |
| """ |
| dates = df[date_col].dropna().sort_values() |
| if len(dates) < 2: |
| return FrequencyInfo(label="Unknown", median_delta=timedelta(0), is_regular=False) |
|
|
| deltas = dates.diff().dropna() |
| median_delta = deltas.median() |
|
|
| |
| std_delta = deltas.std() |
| is_regular = bool(std_delta <= median_delta * 0.2) if median_delta > timedelta(0) else False |
|
|
| |
| days = median_delta.days |
|
|
| if days <= 1: |
| label = "Daily" |
| elif 5 <= days <= 9: |
| label = "Weekly" |
| elif 25 <= days <= 35: |
| label = "Monthly" |
| elif 85 <= days <= 100: |
| label = "Quarterly" |
| elif 350 <= days <= 380: |
| label = "Yearly" |
| else: |
| label = "Irregular" |
|
|
| return FrequencyInfo(label=label, median_delta=median_delta, is_regular=is_regular) |
|
|
|
|
| |
| |
| |
|
|
| def detect_long_format( |
| df: pd.DataFrame, |
| date_col: str, |
| ) -> tuple[bool, str | None, str | None]: |
| """Heuristic: detect whether *df* is in long (stacked) format. |
| |
| Returns ``(is_long, group_col, value_col)``. |
| |
| A DataFrame is flagged as *long* when the date column contains |
| duplicate values **and** there is at least one string/object column |
| among the remaining columns (the likely group identifier). |
| """ |
| if date_col not in df.columns: |
| return False, None, None |
|
|
| dates = df[date_col] |
| if dates.nunique() >= len(dates): |
| |
| return False, None, None |
|
|
| remaining = [c for c in df.columns if c != date_col] |
|
|
| |
| group_col: str | None = None |
| for c in remaining: |
| if df[c].dtype == object or pd.api.types.is_string_dtype(df[c]): |
| group_col = c |
| break |
|
|
| if group_col is None: |
| return False, None, None |
|
|
| |
| value_col: str | None = None |
| for c in remaining: |
| if c == group_col: |
| continue |
| if pd.api.types.is_numeric_dtype(df[c]): |
| value_col = c |
| break |
|
|
| if value_col is None: |
| return False, None, None |
|
|
| return True, group_col, value_col |
|
|
|
|
| def pivot_long_to_wide( |
| df: pd.DataFrame, |
| date_col: str, |
| group_col: str, |
| value_col: str, |
| ) -> pd.DataFrame: |
| """Pivot a long-format DataFrame to wide format. |
| |
| Parameters |
| ---------- |
| df: |
| Long-format dataframe. |
| date_col: |
| Column with date values (becomes the index/row key). |
| group_col: |
| Column whose unique values become the new column headers. |
| value_col: |
| Column with the numeric values to spread. |
| |
| Returns |
| ------- |
| pd.DataFrame |
| Wide dataframe with *date_col* as a regular column and one |
| column per unique value in *group_col*. |
| """ |
| wide = df.pivot_table( |
| index=date_col, |
| columns=group_col, |
| values=value_col, |
| aggfunc="first", |
| ) |
| |
| wide.columns = [str(c) for c in wide.columns] |
| wide = wide.reset_index() |
| return wide |
|
|
|
|
| |
| |
| |
|
|
| def add_time_features(df: pd.DataFrame, date_col: str) -> pd.DataFrame: |
| """Add calendar columns derived from *date_col*. |
| |
| New columns: ``year``, ``quarter``, ``month``, ``day_of_week``. |
| The dataframe is returned (not copied) with new columns appended. |
| """ |
| dt = df[date_col].dt |
| df["year"] = dt.year |
| df["quarter"] = dt.quarter |
| df["month"] = dt.month |
| df["day_of_week"] = dt.dayofweek |
| return df |
|
|