""" TimeSeriesDataSet builder for pytorch_forecasting. Wraps the feature_store output into train / validation / test splits with proper temporal ordering (no leakage). """ from __future__ import annotations import logging import os from typing import Optional import numpy as np import pandas as pd from deep_learning.config import TFTASROConfig, get_tft_config logger = logging.getLogger(__name__) def build_datasets( master_df: pd.DataFrame, time_varying_unknown_reals: list[str], time_varying_known_reals: list[str], target_cols: list[str], cfg: Optional[TFTASROConfig] = None, ): """ Create pytorch_forecasting TimeSeriesDataSet objects for train / val / test. Uses chronological splitting: [train | val | test] Returns: (training_dataset, validation_dataset, test_dataset) """ from pytorch_forecasting import TimeSeriesDataSet if cfg is None: cfg = get_tft_config() n = len(master_df) test_size = int(n * cfg.training.test_ratio) val_size = int(n * cfg.training.val_ratio) train_size = n - val_size - test_size if train_size < cfg.model.max_encoder_length + cfg.model.max_prediction_length: raise ValueError( f"Not enough data for TFT: {train_size} train rows, " f"need at least {cfg.model.max_encoder_length + cfg.model.max_prediction_length}" ) train_cutoff = master_df["time_idx"].iloc[train_size - 1] val_cutoff = master_df["time_idx"].iloc[train_size + val_size - 1] logger.info( "Data split: train=%d (idx<=%.0f), val=%d (idx<=%.0f), test=%d", train_size, train_cutoff, val_size, val_cutoff, test_size, ) target = target_cols[0] if target_cols else "target" training = TimeSeriesDataSet( master_df[master_df["time_idx"] <= train_cutoff], time_idx="time_idx", target=target, group_ids=["group_id"], max_encoder_length=cfg.model.max_encoder_length, max_prediction_length=cfg.model.max_prediction_length, time_varying_unknown_reals=time_varying_unknown_reals, time_varying_known_reals=time_varying_known_reals, static_categoricals=["group_id"], add_relative_time_idx=True, add_target_scales=True, add_encoder_length=True, allow_missing_timesteps=True, ) validation = TimeSeriesDataSet.from_dataset( training, master_df[ (master_df["time_idx"] > train_cutoff - cfg.model.max_encoder_length) & (master_df["time_idx"] <= val_cutoff) ], stop_randomization=True, ) test = TimeSeriesDataSet.from_dataset( training, master_df[master_df["time_idx"] > val_cutoff - cfg.model.max_encoder_length], stop_randomization=True, ) logger.info( "Datasets created: train=%d samples, val=%d, test=%d | " "encoder_len=%d, prediction_len=%d | " "%d unknown reals, %d known reals", len(training), len(validation), len(test), cfg.model.max_encoder_length, cfg.model.max_prediction_length, len(time_varying_unknown_reals), len(time_varying_known_reals), ) return training, validation, test def build_cv_folds( master_df: pd.DataFrame, time_varying_unknown_reals: list[str], time_varying_known_reals: list[str], target_cols: list[str], cfg: Optional[TFTASROConfig] = None, n_folds: int = 3, purge_gap: int = 5, ): """ Purged Walk-Forward Temporal CV with expanding training windows. Test set (last ``test_ratio`` %) is excluded from the CV pool entirely. The remaining data is split into ``n_folds`` expanding-window folds:: Fold 1: [===TRAIN 60%===][GAP][=VAL=][................] Fold 2: [======TRAIN 73%======][GAP][=VAL=][.........] Fold 3: [=========TRAIN 87%=========][GAP][=VAL=] The ``purge_gap`` removes N samples between train and validation to prevent autocovariance-based data leakage (de Prado, 2018). Each validation block covers a different market regime, so Optuna cannot overfit to a single time window (REG-2026-001 root cause). When ``n_folds=1``, returns a single fold equivalent to the old single-split behaviour (backward-compatible fallback). Returns: List of ``(training_dataset, validation_dataset)`` tuples. """ from pytorch_forecasting import TimeSeriesDataSet if cfg is None: cfg = get_tft_config() n = len(master_df) test_size = int(n * cfg.training.test_ratio) cv_pool_size = n - test_size min_seq = cfg.model.max_encoder_length + cfg.model.max_prediction_length # Minimum training size: 60 % of CV pool (ensures enough history) min_train_size = max(int(cv_pool_size * 0.60), min_seq + 10) if min_train_size >= cv_pool_size: raise ValueError( f"Not enough data for {n_folds}-fold CV: " f"cv_pool={cv_pool_size}, min_train={min_train_size}" ) # Divide the remaining space into n_folds equal validation blocks available = cv_pool_size - min_train_size fold_step = max(1, available // n_folds) target = target_cols[0] if target_cols else "target" folds: list[tuple] = [] for fold_idx in range(n_folds): train_end_pos = min(min_train_size + fold_idx * fold_step, cv_pool_size - fold_step) val_start_pos = train_end_pos + purge_gap val_end_pos = min(val_start_pos + fold_step, cv_pool_size) if val_start_pos >= cv_pool_size or val_end_pos <= val_start_pos: logger.warning("Fold %d skipped: purge gap exhausts remaining data", fold_idx) continue train_cutoff = master_df["time_idx"].iloc[train_end_pos - 1] val_start_idx = master_df["time_idx"].iloc[val_start_pos] val_cutoff = master_df["time_idx"].iloc[val_end_pos - 1] train_data = master_df[master_df["time_idx"] <= train_cutoff] val_data = master_df[ (master_df["time_idx"] >= val_start_idx - cfg.model.max_encoder_length) & (master_df["time_idx"] <= val_cutoff) ] training_ds = TimeSeriesDataSet( train_data, time_idx="time_idx", target=target, group_ids=["group_id"], max_encoder_length=cfg.model.max_encoder_length, max_prediction_length=cfg.model.max_prediction_length, time_varying_unknown_reals=time_varying_unknown_reals, time_varying_known_reals=time_varying_known_reals, static_categoricals=["group_id"], add_relative_time_idx=True, add_target_scales=True, add_encoder_length=True, allow_missing_timesteps=True, ) validation_ds = TimeSeriesDataSet.from_dataset( training_ds, val_data, stop_randomization=True, ) logger.info( "CV Fold %d/%d: train=%d samples (idx<=%.0f), " "purge_gap=%d, val=%d (idx %.0f–%.0f)", fold_idx + 1, n_folds, len(training_ds), train_cutoff, purge_gap, len(validation_ds), val_start_idx, val_cutoff, ) folds.append((training_ds, validation_ds)) return folds def _resolve_num_workers(configured: int) -> int: """ Return a safe num_workers value for the current platform. On Windows (os.name == 'nt'), PyTorch DataLoader multiprocessing requires the script to be inside an ``if __name__ == '__main__'`` guard, which is not the case in training scripts. Force 0 to avoid deadlocks. On Linux/macOS (GitHub Actions, HF Spaces), use the configured value; default to 2 when the config still carries the old 0. """ if os.name == "nt": return 0 # On POSIX: honour config; upgrade 0 → 2 as a sensible floor return max(configured, 2) def create_dataloaders( training_dataset, validation_dataset, test_dataset=None, cfg: Optional[TFTASROConfig] = None, ): """ Create PyTorch DataLoaders from TimeSeriesDataSet objects. """ if cfg is None: cfg = get_tft_config() nw = _resolve_num_workers(cfg.training.num_workers) logger.info( "DataLoader workers: %d (platform=%s, configured=%d)", nw, os.name, cfg.training.num_workers, ) train_dl = training_dataset.to_dataloader( train=True, batch_size=cfg.training.batch_size, num_workers=nw, ) val_dl = validation_dataset.to_dataloader( train=False, batch_size=cfg.training.batch_size, num_workers=nw, ) test_dl = None if test_dataset is not None: test_dl = test_dataset.to_dataloader( train=False, batch_size=cfg.training.batch_size, num_workers=nw, ) return train_dl, val_dl, test_dl