| | """Fault classification training utilities for PMU and PV datasets. |
| | |
| | This module trains deep learning models on high-frequency PMU measurements and |
| | supports classical machine learning baselines so the resulting artefacts can be |
| | served via the Gradio app in this repository or on Hugging Face Spaces. It |
| | implements a full training pipeline including preprocessing, sequence |
| | generation, model definition (CNN-LSTM, Temporal Convolutional Network, or |
| | Support Vector Machine), evaluation, and export of deployment metadata. |
| | |
| | Example |
| | ------- |
| | python fault_classification_pmu.py \ |
| | --data-path data/Fault_Classification_PMU_Data.csv \ |
| | --label-column FaultType \ |
| | --model-type tcn \ |
| | --model-out pmu_tcn_model.keras \ |
| | --scaler-out pmu_feature_scaler.pkl \ |
| | --metadata-out pmu_metadata.json |
| | |
| | The script accepts CSV input where each row contains a timestamped PMU |
| | measurement and a categorical fault label. Features default to the 14 PMU |
| | channels used in the project documentation, but any subset can be provided |
| | via the ``--feature-columns`` argument. Data is automatically standardised |
| | and windowed to create temporal sequences that feed into the neural network. |
| | |
| | The exported metadata JSON file contains the feature ordering, label names, |
| | sequence length, stride, and chosen architecture. The Gradio front-end |
| | consumes this file to replicate the same preprocessing steps during inference. |
| | """ |
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import json |
| | import os |
| | import shutil |
| | from datetime import datetime |
| | from pathlib import Path |
| | from typing import Dict, List, Optional, Sequence, Tuple |
| |
|
| | import math |
| |
|
| | os.environ.setdefault("CUDA_VISIBLE_DEVICES", "-1") |
| | os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2") |
| | os.environ.setdefault("TF_ENABLE_ONEDNN_OPTS", "0") |
| |
|
| | import joblib |
| | import numpy as np |
| | import pandas as pd |
| | from pandas.api.types import is_numeric_dtype |
| | from sklearn.metrics import accuracy_score, classification_report, confusion_matrix |
| | from sklearn.model_selection import train_test_split |
| | from sklearn.preprocessing import LabelEncoder, StandardScaler |
| | from sklearn.svm import SVC |
| | from tensorflow.keras import callbacks, layers, models, optimizers |
| |
|
| |
|
| |
|
| | class ProgressCallback(callbacks.Callback): |
| | """Custom callback to provide training progress updates.""" |
| |
|
| | def __init__( |
| | self, |
| | total_epochs, |
| | status_file_path=None, |
| | *, |
| | status_update_interval: float = 10.0, |
| | batch_log_frequency: int = 10, |
| | ): |
| | super().__init__() |
| | self.total_epochs = total_epochs |
| | self.status_file_path = status_file_path |
| | self.status_update_interval = max(1.0, float(status_update_interval)) |
| | self.batch_log_frequency = max(1, int(batch_log_frequency)) |
| | self.current_epoch = 0 |
| | self.train_start_time: Optional[float] = None |
| | self.last_status_report: Optional[float] = None |
| | self.total_batches_per_epoch = 0 |
| | self.batches_seen = 0 |
| |
|
| | |
| | |
| | |
| | def _now(self) -> float: |
| | import time |
| |
|
| | return time.perf_counter() |
| |
|
| | def _training_elapsed(self, now: Optional[float] = None) -> float: |
| | if self.train_start_time is None: |
| | return 0.0 |
| | if now is None: |
| | now = self._now() |
| | return max(0.0, now - self.train_start_time) |
| |
|
| | def _report_status(self, message: str, *, force: bool = False) -> None: |
| | now = self._now() |
| | if not force and self.last_status_report is not None: |
| | if now - self.last_status_report < self.status_update_interval: |
| | return |
| |
|
| | print(message, flush=True) |
| |
|
| | if self.status_file_path: |
| | try: |
| | with open(self.status_file_path, "w") as f: |
| | f.write(message) |
| | except Exception: |
| | |
| | pass |
| |
|
| | self.last_status_report = now |
| |
|
| | |
| | |
| | |
| | def on_train_begin(self, logs=None): |
| | params = self.params or {} |
| | steps = params.get("steps") or params.get("steps_per_epoch") |
| | if steps: |
| | self.total_batches_per_epoch = int(steps) |
| | else: |
| | samples = params.get("samples") |
| | batch_size = params.get("batch_size") or 0 |
| | if samples and batch_size: |
| | self.total_batches_per_epoch = math.ceil(samples / batch_size) |
| | else: |
| | self.total_batches_per_epoch = 0 |
| |
|
| | self.batches_seen = 0 |
| | self.last_status_report = None |
| | self.train_start_time = self._now() |
| |
|
| | def on_epoch_begin(self, epoch, logs=None): |
| | import time |
| |
|
| | now = self._now() |
| | if self.train_start_time is None: |
| | self.train_start_time = now |
| |
|
| | self.current_epoch = epoch + 1 |
| | self.batches_seen = 0 |
| |
|
| | progress_pct = (self.current_epoch / self.total_epochs) * 100 |
| | elapsed_time = self._training_elapsed(now) |
| | status_msg = ( |
| | f"Training epoch {self.current_epoch}/{self.total_epochs} " |
| | f"({progress_pct:.1f}%) - {elapsed_time:.1f}s elapsed" |
| | ) |
| | self._report_status(status_msg, force=True) |
| |
|
| | if self.current_epoch == 1: |
| | wall_clock = time.strftime("%H:%M:%S") |
| | print(f"Starting first epoch at {wall_clock}", flush=True) |
| |
|
| | def on_batch_begin(self, batch, logs=None): |
| | if self.current_epoch == 1 and batch % self.batch_log_frequency == 0: |
| | elapsed = self._training_elapsed() |
| | print(f"Epoch {self.current_epoch}, Batch {batch} started - {elapsed:.1f}s elapsed", flush=True) |
| |
|
| | def on_batch_end(self, batch, logs=None): |
| | self.batches_seen = batch + 1 |
| |
|
| | if self.current_epoch == 1 and batch % self.batch_log_frequency == 0: |
| | logs = logs or {} |
| | loss = logs.get("loss", 0) |
| | elapsed = self._training_elapsed() |
| | print( |
| | f"Epoch {self.current_epoch}, Batch {batch} completed - Loss: {loss:.4f}, {elapsed:.1f}s elapsed", |
| | flush=True, |
| | ) |
| |
|
| | total_batches = self.total_batches_per_epoch or 0 |
| | if not total_batches: |
| | params = self.params or {} |
| | total_batches = ( |
| | params.get("steps") |
| | or params.get("steps_per_epoch") |
| | or 0 |
| | ) |
| |
|
| | if total_batches: |
| | epoch_fraction = min(1.0, (batch + 1) / total_batches) |
| | else: |
| | epoch_fraction = 0.0 |
| |
|
| | overall_progress = ( |
| | (self.current_epoch - 1 + epoch_fraction) / self.total_epochs * 100 |
| | ) |
| | elapsed_time = self._training_elapsed() |
| | status_msg = ( |
| | f"Epoch {self.current_epoch}/{self.total_epochs} - Batch {batch + 1}/{total_batches or '?'} " |
| | f"({overall_progress:.1f}%) - {elapsed_time:.1f}s elapsed" |
| | ) |
| | self._report_status(status_msg) |
| |
|
| | def on_epoch_end(self, epoch, logs=None): |
| | logs = logs or {} |
| | loss = logs.get("loss", 0) |
| | val_loss = logs.get("val_loss", 0) |
| | accuracy = logs.get("accuracy", logs.get("acc", 0)) |
| | val_accuracy = logs.get("val_accuracy", logs.get("val_acc", 0)) |
| | _ = epoch |
| |
|
| | elapsed_time = self._training_elapsed() |
| | status_msg = ( |
| | f"Epoch {self.current_epoch}/{self.total_epochs} completed - " |
| | f"Loss: {loss:.4f}, Val Loss: {val_loss:.4f}, " |
| | f"Acc: {accuracy:.4f}, Val Acc: {val_accuracy:.4f} - {elapsed_time:.1f}s total" |
| | ) |
| | self._report_status(status_msg, force=True) |
| |
|
| | def on_train_end(self, logs=None): |
| | total_elapsed = self._training_elapsed() |
| | final_message = ( |
| | f"Training finished after {self.total_epochs} epoch(s) - " |
| | f"{total_elapsed:.1f}s total elapsed" |
| | ) |
| | self._report_status(final_message, force=True) |
| |
|
| |
|
| | |
| | |
| | DEFAULT_FEATURE_COLUMNS: List[str] = [ |
| | "[325] UPMU_SUB22:FREQ", |
| | "[326] UPMU_SUB22:DFDT", |
| | "[327] UPMU_SUB22:FLAG", |
| | "[328] UPMU_SUB22-L1:MAG", |
| | "[329] UPMU_SUB22-L1:ANG", |
| | "[330] UPMU_SUB22-L2:MAG", |
| | "[331] UPMU_SUB22-L2:ANG", |
| | "[332] UPMU_SUB22-L3:MAG", |
| | "[333] UPMU_SUB22-L3:ANG", |
| | "[334] UPMU_SUB22-C1:MAG", |
| | "[335] UPMU_SUB22-C1:ANG", |
| | "[336] UPMU_SUB22-C2:MAG", |
| | "[337] UPMU_SUB22-C2:ANG", |
| | "[338] UPMU_SUB22-C3:MAG", |
| | "[339] UPMU_SUB22-C3:ANG", |
| | ] |
| |
|
| | LABEL_GUESS_CANDIDATES: Tuple[str, ...] = ("Fault", "FaultType", "Label", "Target", "Class") |
| |
|
| |
|
| | def _normalise_column_name(name: str) -> str: |
| | return str(name).strip().lower() |
| |
|
| |
|
| | def _resolve_label_column(df: pd.DataFrame, requested: str) -> str: |
| | columns = [str(col) for col in df.columns] |
| | if not columns: |
| | raise ValueError("Provided dataframe does not contain any columns.") |
| |
|
| | requested = str(requested or "").strip() |
| | if requested and requested in df.columns: |
| | return requested |
| |
|
| | if requested: |
| | for col in df.columns: |
| | if str(col).strip() == requested: |
| | return str(col) |
| | lowered = requested.lower() |
| | lowered_map = {_normalise_column_name(col): str(col) for col in df.columns} |
| | if lowered in lowered_map: |
| | return lowered_map[lowered] |
| |
|
| | lowered_map = {_normalise_column_name(col): str(col) for col in df.columns} |
| | for guess in LABEL_GUESS_CANDIDATES: |
| | key = guess.lower() |
| | if key in lowered_map: |
| | return lowered_map[key] |
| |
|
| | for col in reversed(df.columns): |
| | if not is_numeric_dtype(df[col]): |
| | return str(col) |
| |
|
| | available = ", ".join(columns) |
| | raise ValueError( |
| | f"Label column '{requested or ' '}' not found in provided dataframe. " |
| | f"Available columns: {available}" |
| | ) |
| |
|
| |
|
| | def _resolve_features(df: pd.DataFrame, feature_columns: Sequence[str] | None, label_column: str) -> List[str]: |
| | if feature_columns: |
| | missing = [c for c in feature_columns if c not in df.columns] |
| | if missing: |
| | raise ValueError(f"Feature columns not present in CSV: {missing}") |
| | return list(feature_columns) |
| |
|
| | |
| | |
| | preferred = [c for c in DEFAULT_FEATURE_COLUMNS if c in df.columns] |
| |
|
| | excluded = {label_column, label_column.lower(), "timestamp", "Timestamp"} |
| | remainder = [c for c in df.columns if c not in preferred and c not in excluded] |
| | ordered = preferred + remainder |
| | if not ordered: |
| | raise ValueError("No feature columns detected. Specify --feature-columns explicitly.") |
| | return ordered |
| |
|
| |
|
| | def load_dataset( |
| | csv_path: Path, |
| | *, |
| | feature_columns: Sequence[str] | None, |
| | label_column: str, |
| | ) -> Tuple[np.ndarray, np.ndarray, List[str], str]: |
| | """Load the dataset from CSV. |
| | |
| | Parameters |
| | ---------- |
| | csv_path: |
| | Path to the CSV file containing PMU measurements. |
| | feature_columns: |
| | Optional explicit ordering of feature columns. |
| | label_column: |
| | Name of the column containing the categorical fault label. |
| | |
| | Returns |
| | ------- |
| | features: np.ndarray |
| | 2-D array of shape (n_samples, n_features). |
| | labels: np.ndarray |
| | 1-D array of label strings. |
| | columns: list[str] |
| | Actual feature ordering used. |
| | resolved_label: str |
| | The column name that supplied the labels. |
| | """ |
| | df = pd.read_csv(csv_path, sep=None, engine="python") |
| | resolved_label = _resolve_label_column(df, label_column) |
| |
|
| | columns = _resolve_features(df, feature_columns, resolved_label) |
| | features = df[columns].astype(np.float32).values |
| | labels = df[resolved_label].astype(str).values |
| | return features, labels, columns, resolved_label |
| |
|
| |
|
| | def load_dataset_from_dataframe( |
| | df: pd.DataFrame, |
| | *, |
| | feature_columns: Sequence[str] | None, |
| | label_column: str, |
| | ) -> Tuple[np.ndarray, np.ndarray, List[str], str]: |
| | """Load dataset arrays directly from a DataFrame.""" |
| |
|
| | resolved_label = _resolve_label_column(df, label_column) |
| |
|
| | columns = _resolve_features(df, feature_columns, resolved_label) |
| | features = df[columns].astype(np.float32).values |
| | labels = df[resolved_label].astype(str).values |
| | return features, labels, columns, resolved_label |
| |
|
| |
|
| | def create_sequences( |
| | features: np.ndarray, |
| | labels: np.ndarray, |
| | *, |
| | sequence_length: int, |
| | stride: int, |
| | ) -> Tuple[np.ndarray, np.ndarray]: |
| | """Create overlapping sequences suitable for sequence models. |
| | |
| | The label assigned to a sequence corresponds to the label of the final |
| | timestep in the window. This choice aligns with fault detection use cases |
| | where the most recent measurement dictates the state of the system. |
| | """ |
| | if sequence_length <= 0: |
| | raise ValueError("sequence_length must be > 0") |
| | if stride <= 0: |
| | raise ValueError("stride must be > 0") |
| | if features.shape[0] != labels.shape[0]: |
| | raise ValueError("Features and labels must contain the same number of rows") |
| | if features.shape[0] < sequence_length: |
| | raise ValueError("Not enough samples to create a single sequence") |
| |
|
| | sequences: List[np.ndarray] = [] |
| | seq_labels: List[str] = [] |
| | for start in range(0, features.shape[0] - sequence_length + 1, stride): |
| | end = start + sequence_length |
| | sequences.append(features[start:end]) |
| | seq_labels.append(labels[end - 1]) |
| | return np.stack(sequences), np.array(seq_labels) |
| |
|
| |
|
| | def build_cnn_lstm( |
| | input_shape: Tuple[int, int], |
| | num_classes: int, |
| | *, |
| | conv_filters: int = 128, |
| | kernel_size: int = 3, |
| | lstm_units: int = 128, |
| | dropout: float = 0.3, |
| | ) -> models.Model: |
| | """Construct a compact yet expressive CNN-LSTM architecture.""" |
| | inputs = layers.Input(shape=input_shape) |
| | x = layers.Conv1D(conv_filters, kernel_size, padding="same", activation="relu")(inputs) |
| | x = layers.BatchNormalization()(x) |
| | x = layers.Conv1D(conv_filters, kernel_size, dilation_rate=2, padding="same", activation="relu")(x) |
| | x = layers.BatchNormalization()(x) |
| | x = layers.Dropout(dropout)(x) |
| | x = layers.LSTM(lstm_units, return_sequences=False)(x) |
| | x = layers.Dropout(dropout)(x) |
| | outputs = layers.Dense(num_classes, activation="softmax")(x) |
| | model = models.Model(inputs, outputs) |
| | model.compile( |
| | optimizer=optimizers.Adam(learning_rate=1e-3), |
| | loss="sparse_categorical_crossentropy", |
| | metrics=["accuracy"], |
| | ) |
| | return model |
| |
|
| |
|
| | def build_tcn( |
| | input_shape: Tuple[int, int], |
| | num_classes: int, |
| | *, |
| | filters: int = 64, |
| | kernel_size: int = 3, |
| | dilations: Sequence[int] = (1, 2, 4, 8), |
| | dropout: float = 0.2, |
| | ) -> models.Model: |
| | """Construct a lightweight Temporal Convolutional Network.""" |
| |
|
| | inputs = layers.Input(shape=input_shape) |
| | x = inputs |
| | for dilation in dilations: |
| | residual = x |
| | x = layers.Conv1D( |
| | filters, |
| | kernel_size, |
| | padding="causal", |
| | activation="relu", |
| | dilation_rate=dilation, |
| | )(x) |
| | x = layers.BatchNormalization()(x) |
| | x = layers.Dropout(dropout)(x) |
| | x = layers.Conv1D( |
| | filters, |
| | kernel_size, |
| | padding="causal", |
| | activation="relu", |
| | dilation_rate=dilation, |
| | )(x) |
| | x = layers.BatchNormalization()(x) |
| | if residual.shape[-1] != filters: |
| | residual = layers.Conv1D(filters, 1, padding="same")(residual) |
| | x = layers.Add()([x, residual]) |
| | x = layers.Activation("relu")(x) |
| |
|
| | x = layers.GlobalAveragePooling1D()(x) |
| | x = layers.Dropout(dropout)(x) |
| | outputs = layers.Dense(num_classes, activation="softmax")(x) |
| |
|
| | model = models.Model(inputs, outputs) |
| | model.compile( |
| | optimizer=optimizers.Adam(learning_rate=1e-3), |
| | loss="sparse_categorical_crossentropy", |
| | metrics=["accuracy"], |
| | ) |
| | return model |
| |
|
| |
|
| | def train_model( |
| | sequences: np.ndarray, |
| | labels: np.ndarray, |
| | *, |
| | validation_split: float, |
| | batch_size: int, |
| | epochs: int, |
| | model_type: str = "cnn_lstm", |
| | tensorboard_log_dir: Optional[Path] = None, |
| | status_file_path: Optional[Path] = None, |
| | ) -> Tuple[object, LabelEncoder, Dict[str, object]]: |
| | """Train a sequence model and return training history and validation outputs.""" |
| |
|
| | model_type = model_type.lower().strip() |
| | if model_type not in {"cnn_lstm", "tcn", "svm"}: |
| | raise ValueError("model_type must be either 'cnn_lstm', 'tcn', or 'svm'") |
| |
|
| | |
| | status_file = status_file_path if status_file_path else None |
| |
|
| | label_encoder = LabelEncoder() |
| | y = label_encoder.fit_transform(labels) |
| |
|
| | if model_type == "svm": |
| | features = sequences.reshape(sequences.shape[0], -1) |
| | else: |
| | features = sequences |
| |
|
| | tb_dir: Optional[str] = None |
| | if model_type != "svm" and tensorboard_log_dir is not None: |
| | tensorboard_log_dir.mkdir(parents=True, exist_ok=True) |
| | tb_dir = str(tensorboard_log_dir.resolve()) |
| | else: |
| | tensorboard_log_dir = None |
| |
|
| | |
| | unique_labels, label_counts = np.unique(y, return_counts=True) |
| | min_samples_per_class = np.min(label_counts) |
| |
|
| | print(f"Label distribution: {dict(zip(unique_labels, label_counts))}") |
| | print(f"Minimum samples per class: {min_samples_per_class}") |
| | print(f"Total sequences: {len(sequences)}, Features per sequence: {sequences.shape[1:]}") |
| |
|
| | |
| | import sys |
| | data_size_mb = sequences.nbytes / (1024 * 1024) |
| | print(f"Data size: {data_size_mb:.2f} MB") |
| | if data_size_mb > 1000: |
| | print("Warning: Large dataset detected. Consider reducing batch size or sequence length.") |
| |
|
| | |
| | if np.any(np.isnan(sequences)) or np.any(np.isinf(sequences)): |
| | print("Warning: NaN or Inf values detected in sequences") |
| | sequences = np.nan_to_num(sequences, nan=0.0, posinf=1e6, neginf=-1e6) |
| |
|
| | |
| | if min_samples_per_class >= 2: |
| | X_train, X_val, y_train, y_val = train_test_split( |
| | features, y, test_size=validation_split, stratify=y, random_state=42 |
| | ) |
| | else: |
| | print(f"Warning: Some classes have only {min_samples_per_class} sample(s). Using simple random split instead of stratified split.") |
| |
|
| | |
| | |
| | total_samples = len(y) |
| | if validation_split * total_samples < len(unique_labels): |
| | |
| | adjusted_split = max(0.1, len(unique_labels) / total_samples) |
| | adjusted_split = min(adjusted_split, 0.3) |
| | print(f"Adjusting validation split from {validation_split} to {adjusted_split}") |
| | validation_split = adjusted_split |
| |
|
| | X_train, X_val, y_train, y_val = train_test_split( |
| | features, y, test_size=validation_split, random_state=42 |
| | ) |
| |
|
| | if model_type == "cnn_lstm": |
| | print("Building CNN-LSTM model...") |
| |
|
| | |
| | if len(sequences) > 100000: |
| | print("Using lightweight CNN-LSTM for large dataset") |
| | model = build_cnn_lstm( |
| | input_shape=sequences.shape[1:], |
| | num_classes=len(label_encoder.classes_), |
| | conv_filters=64, |
| | lstm_units=64, |
| | dropout=0.2 |
| | ) |
| | else: |
| | model = build_cnn_lstm( |
| | input_shape=sequences.shape[1:], num_classes=len(label_encoder.classes_) |
| | ) |
| | print(f"CNN-LSTM model built. Input shape: {sequences.shape[1:]}, Classes: {len(label_encoder.classes_)}") |
| | print(f"Model parameters: {model.count_params():,}") |
| |
|
| | |
| | if len(sequences) > 100000: |
| | callbacks_list = [ |
| | ProgressCallback(total_epochs=epochs, status_file_path=str(status_file) if status_file else None), |
| | callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2, min_lr=1e-5), |
| | callbacks.EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True), |
| | ] |
| | print("Using aggressive callbacks for large dataset") |
| | else: |
| | callbacks_list = [ |
| | ProgressCallback(total_epochs=epochs, status_file_path=str(status_file) if status_file else None), |
| | callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5), |
| | callbacks.EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True), |
| | ] |
| | if tensorboard_log_dir is not None: |
| | callbacks_list.insert(-2, callbacks.TensorBoard(log_dir=tb_dir, histogram_freq=0, write_graph=False)) |
| |
|
| | print(f"Starting CNN-LSTM training with {len(X_train)} training samples, {len(X_val)} validation samples") |
| | print(f"Batch size: {batch_size}, Epochs: {epochs}") |
| |
|
| | if status_file: |
| | with open(status_file, 'w') as f: |
| | f.write(f"CNN-LSTM training started - {len(X_train)} train, {len(X_val)} val samples, batch_size={batch_size}") |
| |
|
| | history = model.fit( |
| | X_train, |
| | y_train, |
| | validation_data=(X_val, y_val), |
| | epochs=epochs, |
| | batch_size=batch_size, |
| | callbacks=callbacks_list, |
| | verbose=2, |
| | ) |
| |
|
| | print("CNN-LSTM training completed, starting prediction...") |
| | if status_file: |
| | with open(status_file, 'w') as f: |
| | f.write("CNN-LSTM training completed, evaluating model...") |
| |
|
| | print(f"Making predictions on {len(X_val)} validation samples...") |
| | if status_file: |
| | with open(status_file, 'w') as f: |
| | f.write(f"Making predictions on {len(X_val)} validation samples...") |
| | y_pred = model.predict(X_val, verbose=0).argmax(axis=1) |
| | print("Predictions completed") |
| | training_history: Dict[str, object] = history.history |
| | elif model_type == "tcn": |
| | print("Building TCN model...") |
| | model = build_tcn(input_shape=sequences.shape[1:], num_classes=len(label_encoder.classes_)) |
| | print(f"TCN model built. Input shape: {sequences.shape[1:]}, Classes: {len(label_encoder.classes_)}") |
| |
|
| | callbacks_list = [ |
| | ProgressCallback(total_epochs=epochs, status_file_path=str(status_file) if status_file else None), |
| | callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5), |
| | callbacks.EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True), |
| | ] |
| | if tensorboard_log_dir is not None: |
| | callbacks_list.insert(-2, callbacks.TensorBoard(log_dir=tb_dir, histogram_freq=0, write_graph=False)) |
| |
|
| | print(f"Starting TCN training with {len(X_train)} training samples, {len(X_val)} validation samples") |
| | print(f"Batch size: {batch_size}, Epochs: {epochs}") |
| |
|
| | if status_file: |
| | with open(status_file, 'w') as f: |
| | f.write(f"TCN training started - {len(X_train)} train, {len(X_val)} val samples, batch_size={batch_size}") |
| |
|
| | history = model.fit( |
| | X_train, |
| | y_train, |
| | validation_data=(X_val, y_val), |
| | epochs=epochs, |
| | batch_size=batch_size, |
| | callbacks=callbacks_list, |
| | verbose=2, |
| | ) |
| |
|
| | print("TCN training completed, starting prediction...") |
| | if status_file: |
| | with open(status_file, 'w') as f: |
| | f.write("TCN training completed, evaluating model...") |
| |
|
| | print(f"Making TCN predictions on {len(X_val)} validation samples...") |
| | if status_file: |
| | with open(status_file, 'w') as f: |
| | f.write(f"Making TCN predictions on {len(X_val)} validation samples...") |
| | y_pred = model.predict(X_val, verbose=0).argmax(axis=1) |
| | print("TCN predictions completed") |
| | training_history = history.history |
| | else: |
| | print("Training SVM model...", flush=True) |
| | if status_file: |
| | with open(status_file, 'w') as f: |
| | f.write("Training SVM model...") |
| |
|
| | model = SVC(kernel="rbf", probability=True, class_weight="balanced") |
| | model.fit(X_train, y_train) |
| |
|
| | print("SVM training completed. Evaluating...", flush=True) |
| | if status_file: |
| | with open(status_file, 'w') as f: |
| | f.write("SVM training completed. Evaluating...") |
| |
|
| | y_pred = model.predict(X_val) |
| | training_history = { |
| | "train_accuracy": float(model.score(X_train, y_train)), |
| | "val_accuracy": float(accuracy_score(y_val, y_pred)), |
| | } |
| |
|
| | cm = confusion_matrix(y_val, y_pred) |
| | metrics: Dict[str, object] = { |
| | "history": training_history, |
| | "validation": { |
| | "y_true": y_val, |
| | "y_pred": y_pred, |
| | "class_names": label_encoder.classes_.tolist(), |
| | "confusion_matrix": cm, |
| | }, |
| | "model_type": model_type, |
| | "input_shape": list(sequences.shape[1:]), |
| | "tensorboard_log_dir": tb_dir, |
| | } |
| | return model, label_encoder, metrics |
| |
|
| |
|
| | def standardise_sequences(sequences: np.ndarray) -> Tuple[np.ndarray, StandardScaler]: |
| | """Apply standard scaling per feature across all timesteps.""" |
| | scaler = StandardScaler() |
| | flattened = sequences.reshape(-1, sequences.shape[-1]) |
| | scaled = scaler.fit_transform(flattened) |
| | return scaled.reshape(sequences.shape), scaler |
| |
|
| |
|
| | def export_artifacts( |
| | *, |
| | model: object, |
| | scaler: StandardScaler, |
| | label_encoder: LabelEncoder, |
| | feature_columns: Sequence[str], |
| | label_column: str, |
| | sequence_length: int, |
| | stride: int, |
| | model_path: Path, |
| | scaler_path: Path, |
| | metadata_path: Path, |
| | metrics: dict, |
| | ) -> None: |
| | """Persist trained assets to disk for deployment.""" |
| | model_path.parent.mkdir(parents=True, exist_ok=True) |
| | scaler_path.parent.mkdir(parents=True, exist_ok=True) |
| | metadata_path.parent.mkdir(parents=True, exist_ok=True) |
| | model_type = str(metrics.get("model_type", "cnn_lstm")) |
| | if model_type == "svm": |
| | joblib.dump(model, model_path) |
| | else: |
| | model.save(model_path) |
| | joblib.dump(scaler, scaler_path) |
| |
|
| | validation = metrics["validation"] |
| | report_dict = classification_report( |
| | validation["y_true"], |
| | validation["y_pred"], |
| | target_names=label_encoder.classes_, |
| | output_dict=True, |
| | ) |
| |
|
| | metadata = { |
| | "feature_columns": list(feature_columns), |
| | "label_classes": label_encoder.classes_.tolist(), |
| | "label_column": label_column, |
| | "sequence_length": sequence_length, |
| | "stride": stride, |
| | "model_path": str(model_path), |
| | "scaler_path": str(scaler_path), |
| | "training_history": metrics["history"], |
| | "classification_report": report_dict, |
| | "model_type": model_type, |
| | "model_format": "joblib" if model_type == "svm" else "keras", |
| | "input_shape": metrics.get("input_shape"), |
| | "tensorboard_log_dir": metrics.get("tensorboard_log_dir"), |
| | } |
| | confusion = validation.get("confusion_matrix") |
| | if confusion is None: |
| | confusion = confusion_matrix(validation["y_true"], validation["y_pred"]) |
| | metadata["confusion_matrix"] = np.asarray(confusion).tolist() |
| |
|
| | metadata_path.write_text(json.dumps(metadata, indent=2)) |
| |
|
| |
|
| | def train_from_dataframe( |
| | df: pd.DataFrame, |
| | *, |
| | label_column: str, |
| | feature_columns: Sequence[str] | None = None, |
| | sequence_length: int = 32, |
| | stride: int = 4, |
| | validation_split: float = 0.2, |
| | batch_size: int = 128, |
| | epochs: int = 50, |
| | model_type: str = "cnn_lstm", |
| | model_path: Path | str = "pmu_cnn_lstm_model.keras", |
| | scaler_path: Path | str = "pmu_feature_scaler.pkl", |
| | metadata_path: Path | str = "pmu_metadata.json", |
| | enable_tensorboard: bool = True, |
| | tensorboard_root: Path | str | None = None, |
| | ) -> dict: |
| | """Train a PMU fault classification model using an in-memory dataframe.""" |
| |
|
| | model_path = Path(model_path) |
| | scaler_path = Path(scaler_path) |
| | metadata_path = Path(metadata_path) |
| |
|
| | |
| | status_file = model_path.parent / "training_status.txt" |
| | print(f"Training progress will be written to: {status_file}") |
| |
|
| | tensorboard_log_dir: Optional[Path] = None |
| | if enable_tensorboard and model_type.lower() != "svm": |
| | base_dir = Path(tensorboard_root) if tensorboard_root is not None else Path("tensorboard_runs") |
| | timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") |
| | tensorboard_log_dir = base_dir / f"run-{timestamp}" |
| |
|
| | features, labels, used_columns, resolved_label = load_dataset_from_dataframe( |
| | df, feature_columns=feature_columns, label_column=label_column |
| | ) |
| |
|
| | print(f"Input data: {len(features)} samples") |
| | print(f"Creating sequences with length={sequence_length}, stride={stride}") |
| |
|
| | sequences, seq_labels = create_sequences( |
| | features, |
| | labels, |
| | sequence_length=sequence_length, |
| | stride=stride, |
| | ) |
| |
|
| | print(f"Generated {len(sequences)} sequences") |
| |
|
| | |
| | if len(sequences) < 10: |
| | raise ValueError( |
| | f"Only {len(sequences)} sequences generated. Need at least 10 for training. " |
| | f"Try reducing sequence_length (currently {sequence_length}) or stride (currently {stride}), " |
| | "or provide more data." |
| | ) |
| |
|
| | |
| | if len(sequences) < 100 and model_type in ['cnn_lstm', 'tcn']: |
| | print(f"Warning: Only {len(sequences)} sequences available. Consider using SVM for small datasets.") |
| |
|
| | sequences, scaler = standardise_sequences(sequences) |
| |
|
| | |
| | original_batch_size = batch_size |
| | original_epochs = epochs |
| | original_validation_split = validation_split |
| |
|
| | |
| | if len(sequences) > 100000: |
| | print(f"Large dataset detected ({len(sequences)} sequences). Optimizing parameters...") |
| | batch_size = min(batch_size * 2, 512) |
| | epochs = min(epochs, 30) |
| | print(f"Adjusted parameters for large dataset:") |
| | print(f" Batch size: {original_batch_size} -> {batch_size}") |
| | print(f" Epochs: {original_epochs} -> {epochs}") |
| |
|
| | |
| | import gc |
| | gc.collect() |
| |
|
| | elif len(sequences) < 100: |
| | |
| | batch_size = max(min(batch_size, len(sequences) // 4), 4) |
| | epochs = min(epochs, 20) |
| | validation_split = min(validation_split, 0.3) |
| | print(f"Adjusted parameters for small dataset:") |
| | print(f" Batch size: {original_batch_size} -> {batch_size}") |
| | print(f" Epochs: {original_epochs} -> {epochs}") |
| | print(f" Validation split: {original_validation_split} -> {validation_split}") |
| |
|
| | model, label_encoder, metrics = train_model( |
| | sequences, |
| | seq_labels, |
| | validation_split=validation_split, |
| | batch_size=batch_size, |
| | epochs=epochs, |
| | model_type=model_type, |
| | tensorboard_log_dir=tensorboard_log_dir, |
| | status_file_path=status_file, |
| | ) |
| |
|
| | export_artifacts( |
| | model=model, |
| | scaler=scaler, |
| | label_encoder=label_encoder, |
| | feature_columns=used_columns, |
| | label_column=resolved_label, |
| | sequence_length=sequence_length, |
| | stride=stride, |
| | model_path=model_path, |
| | scaler_path=scaler_path, |
| | metadata_path=metadata_path, |
| | metrics=metrics, |
| | ) |
| |
|
| | tensorboard_zip_path: Optional[str] = None |
| | if tensorboard_log_dir and tensorboard_log_dir.exists(): |
| | try: |
| | tensorboard_zip_path = shutil.make_archive( |
| | base_name=str(tensorboard_log_dir.parent / tensorboard_log_dir.name), |
| | format="zip", |
| | root_dir=str(tensorboard_log_dir.parent), |
| | base_dir=tensorboard_log_dir.name, |
| | ) |
| | tensorboard_zip_path = str(Path(tensorboard_zip_path).resolve()) |
| | except Exception: |
| | tensorboard_zip_path = None |
| |
|
| | report_dict = classification_report( |
| | metrics["validation"]["y_true"], |
| | metrics["validation"]["y_pred"], |
| | target_names=metrics["validation"]["class_names"], |
| | output_dict=True, |
| | ) |
| | confusion = metrics["validation"].get("confusion_matrix") |
| | if confusion is None: |
| | confusion = confusion_matrix(metrics["validation"]["y_true"], metrics["validation"]["y_pred"]) |
| |
|
| | return { |
| | "num_samples": int(df.shape[0]), |
| | "num_sequences": int(sequences.shape[0]), |
| | "feature_columns": used_columns, |
| | "class_names": label_encoder.classes_.tolist(), |
| | "model_path": str(model_path.resolve()), |
| | "scaler_path": str(scaler_path.resolve()), |
| | "metadata_path": str(metadata_path.resolve()), |
| | "history": metrics["history"], |
| | "model_type": metrics.get("model_type", model_type), |
| | "classification_report": report_dict, |
| | "confusion_matrix": np.asarray(confusion).tolist(), |
| | "tensorboard_log_dir": metrics.get("tensorboard_log_dir"), |
| | "tensorboard_zip_path": tensorboard_zip_path, |
| | "label_column": resolved_label, |
| | } |
| |
|
| |
|
| | def run_training(args: argparse.Namespace) -> None: |
| | csv_path = Path(args.data_path) |
| | model_out = Path(args.model_out) |
| | scaler_out = Path(args.scaler_out) |
| | metadata_out = Path(args.metadata_out) |
| |
|
| | features, labels, feature_columns, resolved_label = load_dataset( |
| | csv_path, feature_columns=args.feature_columns, label_column=args.label_column |
| | ) |
| |
|
| | sequences, seq_labels = create_sequences( |
| | features, |
| | labels, |
| | sequence_length=args.sequence_length, |
| | stride=args.stride, |
| | ) |
| |
|
| | sequences, scaler = standardise_sequences(sequences) |
| | tensorboard_log_dir: Optional[Path] = None |
| | if args.tensorboard and args.model_type != "svm": |
| | if args.tensorboard_log_dir: |
| | tensorboard_log_dir = Path(args.tensorboard_log_dir) |
| | else: |
| | tensorboard_log_dir = Path("tensorboard_runs") / datetime.utcnow().strftime("%Y%m%d-%H%M%S") |
| | model, label_encoder, metrics = train_model( |
| | sequences, |
| | seq_labels, |
| | validation_split=args.validation_split, |
| | batch_size=args.batch_size, |
| | epochs=args.epochs, |
| | model_type=args.model_type, |
| | tensorboard_log_dir=tensorboard_log_dir, |
| | status_file_path=None, |
| | ) |
| |
|
| | export_artifacts( |
| | model=model, |
| | scaler=scaler, |
| | label_encoder=label_encoder, |
| | feature_columns=feature_columns, |
| | label_column=resolved_label, |
| | sequence_length=args.sequence_length, |
| | stride=args.stride, |
| | model_path=model_out, |
| | scaler_path=scaler_out, |
| | metadata_path=metadata_out, |
| | metrics=metrics, |
| | ) |
| |
|
| | print("Training complete") |
| | print(f"Model architecture : {args.model_type}") |
| | print(f"Model saved to : {model_out}") |
| | print(f"Scaler saved to : {scaler_out}") |
| | print(f"Metadata saved to : {metadata_out}") |
| | print("Validation metrics:") |
| | report = classification_report( |
| | metrics["validation"]["y_true"], metrics["validation"]["y_pred"], target_names=metrics["validation"]["class_names"] |
| | ) |
| | print(report) |
| | if metrics.get("tensorboard_log_dir"): |
| | tb_dir = metrics["tensorboard_log_dir"] |
| | print(f"TensorBoard logs written to: {tb_dir}") |
| | print(f"Launch TensorBoard with: tensorboard --logdir \"{tb_dir}\"") |
| |
|
| |
|
| | def parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: |
| | parser = argparse.ArgumentParser(description="Train a sequence model for PMU fault classification") |
| | parser.add_argument("--data-path", required=True, help="Path to Fault_Classification_PMU_Data CSV") |
| | parser.add_argument( |
| | "--label-column", |
| | default="Fault", |
| | help="Name of the target label column (default: Fault)", |
| | ) |
| | parser.add_argument( |
| | "--feature-columns", |
| | nargs="*", |
| | default=None, |
| | help="Optional explicit list of feature columns. Defaults to all non-label columns", |
| | ) |
| | parser.add_argument("--sequence-length", type=int, default=32, help="Number of timesteps per training window") |
| | parser.add_argument("--stride", type=int, default=4, help="Step size between consecutive windows") |
| | parser.add_argument("--validation-split", type=float, default=0.2, help="Validation set fraction") |
| | parser.add_argument("--batch-size", type=int, default=128, help="Training batch size") |
| | parser.add_argument("--epochs", type=int, default=50, help="Maximum number of training epochs") |
| | parser.add_argument( |
| | "--model-type", |
| | choices=["cnn_lstm", "tcn", "svm"], |
| | default="cnn_lstm", |
| | help="Model architecture to train (choices: cnn_lstm, tcn, svm)", |
| | ) |
| | parser.add_argument("--model-out", default="pmu_cnn_lstm_model.keras", help="Path to save trained Keras model") |
| | parser.add_argument("--scaler-out", default="pmu_feature_scaler.pkl", help="Path to save fitted StandardScaler") |
| | parser.add_argument("--metadata-out", default="pmu_metadata.json", help="Path to save metadata JSON") |
| | parser.add_argument( |
| | "--tensorboard-log-dir", |
| | default=None, |
| | help="Optional directory to write TensorBoard logs (defaults to tensorboard_runs/<timestamp>)", |
| | ) |
| | parser.add_argument( |
| | "--no-tensorboard", |
| | dest="tensorboard", |
| | action="store_false", |
| | help="Disable TensorBoard logging for neural network models", |
| | ) |
| | parser.set_defaults(tensorboard=True) |
| | return parser.parse_args(argv) |
| |
|
| |
|
| | def main(argv: Sequence[str] | None = None) -> None: |
| | args = parse_args(argv) |
| | run_training(args) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|