Spaces:
Sleeping
Sleeping
| import csv | |
| import json | |
| import os | |
| import random | |
| import sys | |
| import numpy as np | |
| import joblib | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from sklearn.metrics import ( | |
| classification_report, | |
| confusion_matrix, | |
| f1_score, | |
| precision_recall_fscore_support, | |
| roc_auc_score, | |
| roc_curve, | |
| ) | |
| from data_preparation.prepare_dataset import get_dataloaders, SELECTED_FEATURES | |
| _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) | |
| USE_CLEARML = os.environ.get("USE_CLEARML", "0") == "1" or bool(os.environ.get("CLEARML_TASK_ID")) | |
| CLEARML_QUEUE = os.environ.get("CLEARML_QUEUE", "") | |
| def _load_cfg(): | |
| """Build training config from config/default.yaml with fallbacks.""" | |
| try: | |
| from config import get | |
| mlp = get("mlp") or {} | |
| data = get("data") or {} | |
| ratios = data.get("split_ratios", [0.7, 0.15, 0.15]) | |
| return { | |
| "model_name": mlp.get("model_name", "face_orientation"), | |
| "epochs": mlp.get("epochs", 30), | |
| "batch_size": mlp.get("batch_size", 32), | |
| "lr": mlp.get("lr", 1e-3), | |
| "seed": mlp.get("seed", 42), | |
| "split_ratios": tuple(ratios), | |
| "hidden_sizes": mlp.get("hidden_sizes", [64, 32]), | |
| "checkpoints_dir": os.path.join(_PROJECT_ROOT, "checkpoints"), | |
| "logs_dir": os.path.join(_PROJECT_ROOT, "evaluation", "logs"), | |
| } | |
| except Exception: | |
| return { | |
| "model_name": "face_orientation", | |
| "epochs": 30, | |
| "batch_size": 32, | |
| "lr": 1e-3, | |
| "seed": 42, | |
| "split_ratios": (0.7, 0.15, 0.15), | |
| "hidden_sizes": [64, 32], | |
| "checkpoints_dir": os.path.join(_PROJECT_ROOT, "checkpoints"), | |
| "logs_dir": os.path.join(_PROJECT_ROOT, "evaluation", "logs"), | |
| } | |
| CFG = _load_cfg() | |
| # ==== ClearML: expose all config as task params, support remote execution ==== | |
| task = None | |
| if USE_CLEARML: | |
| try: | |
| from clearml import Task | |
| from config import CLEARML_PROJECT_NAME, flatten_for_clearml | |
| task = Task.init( | |
| project_name=CLEARML_PROJECT_NAME, | |
| task_name="MLP Model Training", | |
| tags=["training", "mlp_model"], | |
| ) | |
| from config.clearml_enrich import enrich_task, upload_repro_artifacts | |
| enrich_task(task, role="train_mlp") | |
| flat = flatten_for_clearml() | |
| flat["mlp/model_name"] = CFG.get("model_name", "face_orientation") | |
| flat["mlp/epochs"] = CFG.get("epochs", 30) | |
| flat["mlp/batch_size"] = CFG.get("batch_size", 32) | |
| flat["mlp/lr"] = CFG.get("lr", 1e-3) | |
| flat["mlp/seed"] = CFG.get("seed", 42) | |
| flat["mlp/hidden_sizes"] = str(CFG.get("hidden_sizes", [64, 32])) | |
| flat["mlp/split_ratios"] = str(CFG.get("split_ratios", (0.7, 0.15, 0.15))) | |
| task.connect(flat) | |
| upload_repro_artifacts(task) | |
| if CLEARML_QUEUE: | |
| print(f"[ClearML] Enqueuing to queue '{CLEARML_QUEUE}'. Agent will run training.") | |
| task.execute_remotely(queue_name=CLEARML_QUEUE) | |
| sys.exit(0) | |
| except ImportError: | |
| task = None | |
| USE_CLEARML = False | |
| # ==== Model ============================================= | |
| def set_seed(seed: int) -> None: | |
| """Set random seed for numpy, torch, and Python RNG for reproducibility.""" | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(seed) | |
| class BaseModel(nn.Module): | |
| """MLP classifier: num_features -> hidden_sizes -> num_classes. Used for face_orientation focus.""" | |
| def __init__(self, num_features: int, num_classes: int, hidden_sizes: list[int] | None = None): | |
| super().__init__() | |
| sizes = hidden_sizes or CFG.get("hidden_sizes", [64, 32]) | |
| layers = [] | |
| prev = num_features | |
| for h in sizes: | |
| layers.extend([nn.Linear(prev, h), nn.ReLU()]) | |
| prev = h | |
| layers.append(nn.Linear(prev, num_classes)) | |
| self.network = nn.Sequential(*layers) | |
| def forward(self, x): | |
| return self.network(x) | |
| def training_step(self, loader, optimizer, criterion, device): | |
| self.train() | |
| total_loss = 0.0 | |
| correct = 0 | |
| total = 0 | |
| for features, labels in loader: | |
| features, labels = features.to(device), labels.to(device) | |
| optimizer.zero_grad() | |
| outputs = self(features) | |
| loss = criterion(outputs, labels) | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() * features.size(0) | |
| correct += (outputs.argmax(dim=1) == labels).sum().item() | |
| total += features.size(0) | |
| return total_loss / total, correct / total | |
| def validation_step(self, loader, criterion, device): | |
| self.eval() | |
| total_loss = 0.0 | |
| correct = 0 | |
| total = 0 | |
| all_preds = [] | |
| all_labels = [] | |
| for features, labels in loader: | |
| features, labels = features.to(device), labels.to(device) | |
| outputs = self(features) | |
| loss = criterion(outputs, labels) | |
| total_loss += loss.item() * features.size(0) | |
| preds = outputs.argmax(dim=1) | |
| correct += (preds == labels).sum().item() | |
| total += features.size(0) | |
| all_preds.extend(preds.cpu().numpy()) | |
| all_labels.extend(labels.cpu().numpy()) | |
| val_f1 = f1_score(np.array(all_labels), np.array(all_preds), average="weighted") | |
| return total_loss / total, correct / total, val_f1 | |
| def test_step(self, loader, criterion, device): | |
| self.eval() | |
| total_loss = 0.0 | |
| correct = 0 | |
| total = 0 | |
| all_preds = [] | |
| all_labels = [] | |
| all_probs = [] | |
| for features, labels in loader: | |
| features, labels = features.to(device), labels.to(device) | |
| outputs = self(features) | |
| loss = criterion(outputs, labels) | |
| total_loss += loss.item() * features.size(0) | |
| preds = outputs.argmax(dim=1) | |
| correct += (preds == labels).sum().item() | |
| total += features.size(0) | |
| probs = torch.softmax(outputs, dim=1) | |
| all_preds.extend(preds.cpu().numpy()) | |
| all_labels.extend(labels.cpu().numpy()) | |
| all_probs.extend(probs.cpu().numpy()) | |
| return total_loss / total, correct / total, np.array(all_probs), np.array(all_preds), np.array(all_labels) | |
| def main() -> None: | |
| """Train MLP on face_orientation features, save best checkpoint and scaler to checkpoints/.""" | |
| set_seed(CFG["seed"]) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"[TRAIN] Device: {device}") | |
| print(f"[TRAIN] Model: {CFG['model_name']}") | |
| train_loader, val_loader, test_loader, num_features, num_classes, scaler = get_dataloaders( | |
| model_name=CFG["model_name"], | |
| batch_size=CFG["batch_size"], | |
| split_ratios=CFG["split_ratios"], | |
| seed=CFG["seed"], | |
| ) | |
| model = BaseModel(num_features, num_classes, hidden_sizes=CFG.get("hidden_sizes")).to(device) | |
| criterion = nn.CrossEntropyLoss() | |
| optimizer = optim.Adam(model.parameters(), lr=CFG["lr"]) | |
| param_count = sum(p.numel() for p in model.parameters()) | |
| print(f"[TRAIN] Parameters: {param_count:,}") | |
| ckpt_dir = CFG["checkpoints_dir"] | |
| os.makedirs(ckpt_dir, exist_ok=True) | |
| best_ckpt_path = os.path.join(ckpt_dir, "mlp_best.pt") | |
| history = { | |
| "model_name": CFG["model_name"], | |
| "param_count": param_count, | |
| "epochs": [], | |
| "train_loss": [], | |
| "train_acc": [], | |
| "val_loss": [], | |
| "val_acc": [], | |
| "val_f1": [], | |
| } | |
| best_val_f1 = 0.0 | |
| best_val_acc = 0.0 | |
| print(f"\n{'Epoch':>6} | {'Train Loss':>10} | {'Train Acc':>9} | {'Val Loss':>10} | {'Val Acc':>9} | {'Val F1':>8}") | |
| print("-" * 72) | |
| for epoch in range(1, CFG["epochs"] + 1): | |
| train_loss, train_acc = model.training_step(train_loader, optimizer, criterion, device) | |
| val_loss, val_acc, val_f1 = model.validation_step(val_loader, criterion, device) | |
| history["epochs"].append(epoch) | |
| history["train_loss"].append(round(train_loss, 4)) | |
| history["train_acc"].append(round(train_acc, 4)) | |
| history["val_loss"].append(round(val_loss, 4)) | |
| history["val_acc"].append(round(val_acc, 4)) | |
| history["val_f1"].append(round(val_f1, 4)) | |
| current_lr = optimizer.param_groups[0]['lr'] | |
| if task is not None: | |
| task.logger.report_scalar("Loss", "Train", float(train_loss), iteration=epoch) | |
| task.logger.report_scalar("Accuracy", "Train", float(train_acc), iteration=epoch) | |
| task.logger.report_scalar("Loss", "Val", float(val_loss), iteration=epoch) | |
| task.logger.report_scalar("Accuracy", "Val", float(val_acc), iteration=epoch) | |
| task.logger.report_scalar("F1", "Val", float(val_f1), iteration=epoch) | |
| task.logger.report_scalar("Learning Rate", "LR", float(current_lr), iteration=epoch) | |
| task.logger.flush() | |
| marker = "" | |
| if val_f1 > best_val_f1: | |
| best_val_f1 = val_f1 | |
| best_val_acc = val_acc | |
| torch.save(model.state_dict(), best_ckpt_path) | |
| marker = " *" | |
| print( | |
| f"{epoch:>6} | {train_loss:>10.4f} | {train_acc:>8.2%} | {val_loss:>10.4f} | " | |
| f"{val_acc:>8.2%} | {val_f1:>8.4f}{marker}" | |
| ) | |
| print(f"\nBest validation F1: {best_val_f1:.4f} (accuracy at best F1: {best_val_acc:.2%})") | |
| print(f"Checkpoint saved to: {best_ckpt_path}") | |
| model.load_state_dict(torch.load(best_ckpt_path, weights_only=True)) | |
| test_loss, test_acc, test_probs, test_preds, test_labels = model.test_step(test_loader, criterion, device) | |
| test_labels_np = np.asarray(test_labels) | |
| test_preds_np = np.asarray(test_preds) | |
| test_f1 = f1_score(test_labels_np, test_preds_np, average="weighted") | |
| if num_classes > 2: | |
| test_auc = roc_auc_score(test_labels_np, test_probs, multi_class="ovr", average="weighted") | |
| else: | |
| test_auc = roc_auc_score(test_labels_np, test_probs[:, 1]) | |
| print(f"\n[TEST] Loss: {test_loss:.4f} | Accuracy: {test_acc:.2%}") | |
| print(f"[TEST] F1: {test_f1:.4f} | ROC-AUC: {test_auc:.4f}") | |
| history["test_loss"] = round(test_loss, 4) | |
| history["test_acc"] = round(test_acc, 4) | |
| history["test_f1"] = round(test_f1, 4) | |
| history["test_auc"] = round(test_auc, 4) | |
| # Dataset stats for ClearML | |
| train_labels = train_loader.dataset.labels.numpy() | |
| val_labels = val_loader.dataset.labels.numpy() | |
| dataset_stats = { | |
| "train_size": len(train_loader.dataset), | |
| "val_size": len(val_loader.dataset), | |
| "test_size": len(test_loader.dataset), | |
| "train_class_counts": np.bincount(train_labels, minlength=num_classes).tolist(), | |
| "val_class_counts": np.bincount(val_labels, minlength=num_classes).tolist(), | |
| "test_class_counts": np.bincount(test_labels_np, minlength=num_classes).tolist(), | |
| } | |
| history["dataset_stats"] = dataset_stats | |
| logs_dir = CFG["logs_dir"] | |
| os.makedirs(logs_dir, exist_ok=True) | |
| log_path = os.path.join(logs_dir, f"{CFG['model_name']}_training_log.json") | |
| with open(log_path, "w") as f: | |
| json.dump(history, f, indent=2) | |
| print(f"[LOG] Training history saved to: {log_path}") | |
| scaler_path = os.path.join(ckpt_dir, "scaler_mlp.joblib") | |
| joblib.dump(scaler, scaler_path) | |
| meta_path = os.path.join(ckpt_dir, "meta_mlp.npz") | |
| np.savez(meta_path, feature_names=np.array(SELECTED_FEATURES["face_orientation"])) | |
| print(f"[LOG] Scaler and meta saved to {ckpt_dir}") | |
| cm = confusion_matrix(test_labels_np, test_preds_np) | |
| pred_csv = os.path.join(logs_dir, f"{CFG['model_name']}_test_predictions.csv") | |
| with open(pred_csv, "w", newline="") as f: | |
| w = csv.writer(f) | |
| w.writerow(["y_true", "y_pred"] + [f"prob_{j}" for j in range(num_classes)]) | |
| for i in range(len(test_labels_np)): | |
| w.writerow( | |
| [int(test_labels_np[i]), int(test_preds_np[i])] | |
| + [float(x) for x in test_probs[i]] | |
| ) | |
| summary_path = os.path.join(logs_dir, f"{CFG['model_name']}_test_metrics_summary.json") | |
| with open(summary_path, "w", encoding="utf-8") as f: | |
| json.dump( | |
| { | |
| "model": "mlp", | |
| "model_name": CFG["model_name"], | |
| "checkpoint": os.path.basename(best_ckpt_path), | |
| "test_loss": history["test_loss"], | |
| "test_accuracy": history["test_acc"], | |
| "test_f1_weighted": history["test_f1"], | |
| "test_roc_auc": history["test_auc"], | |
| "confusion_matrix": cm.tolist(), | |
| "classification_report": classification_report( | |
| test_labels_np, test_preds_np, digits=4 | |
| ), | |
| }, | |
| f, | |
| indent=2, | |
| ) | |
| print(f"[LOG] Test predictions → {pred_csv}") | |
| # ClearML: artifacts, confusion matrix, per-class metrics, registered model | |
| if task is not None: | |
| from clearml import OutputModel | |
| from config.clearml_enrich import attach_output_metrics, task_done_summary | |
| task.upload_artifact(name="mlp_checkpoint", artifact_object=best_ckpt_path) | |
| task.upload_artifact(name="training_log", artifact_object=log_path) | |
| task.upload_artifact(name="test_predictions", artifact_object=pred_csv) | |
| task.upload_artifact(name="test_metrics_summary", artifact_object=summary_path) | |
| task.upload_artifact(name="scaler_mlp", artifact_object=scaler_path) | |
| task.upload_artifact(name="meta_mlp", artifact_object=meta_path) | |
| out_model = OutputModel( | |
| task=task, name=f"MLP_{CFG['model_name']}", framework="PyTorch" | |
| ) | |
| out_model.update_weights( | |
| weights_filename=best_ckpt_path, auto_delete_file=False | |
| ) | |
| attach_output_metrics( | |
| out_model, | |
| { | |
| "test_accuracy": round(float(test_acc), 6), | |
| "test_f1_weighted": round(float(test_f1), 6), | |
| "test_roc_auc": round(float(test_auc), 6), | |
| }, | |
| ) | |
| task_done_summary( | |
| task, | |
| f"MLP {CFG['model_name']}: test acc={test_acc:.4f}, F1={test_f1:.4f}, ROC-AUC={test_auc:.4f}", | |
| ) | |
| task.logger.report_single_value("test/accuracy", test_acc) | |
| task.logger.report_single_value("test/f1_weighted", test_f1) | |
| task.logger.report_single_value("test/roc_auc", test_auc) | |
| for key, val in dataset_stats.items(): | |
| if isinstance(val, list): | |
| for i, v in enumerate(val): | |
| task.logger.report_single_value(f"dataset/{key}/{i}", float(v)) | |
| else: | |
| task.logger.report_single_value(f"dataset/{key}", float(val)) | |
| prec, rec, f1_per_class, _ = precision_recall_fscore_support( | |
| test_labels_np, test_preds_np, average=None, zero_division=0 | |
| ) | |
| for c in range(num_classes): | |
| task.logger.report_single_value(f"test/class_{c}_precision", float(prec[c])) | |
| task.logger.report_single_value(f"test/class_{c}_recall", float(rec[c])) | |
| task.logger.report_single_value(f"test/class_{c}_f1", float(f1_per_class[c])) | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| fig, ax = plt.subplots(figsize=(6, 5)) | |
| ax.imshow(cm, cmap="Blues") | |
| ax.set_xticks(range(num_classes)) | |
| ax.set_yticks(range(num_classes)) | |
| ax.set_xticklabels([f"Class {i}" for i in range(num_classes)]) | |
| ax.set_yticklabels([f"Class {i}" for i in range(num_classes)]) | |
| for i in range(num_classes): | |
| for j in range(num_classes): | |
| ax.text(j, i, str(cm[i, j]), ha="center", va="center", color="black") | |
| ax.set_xlabel("Predicted") | |
| ax.set_ylabel("True") | |
| ax.set_title("Test set confusion matrix") | |
| fig.tight_layout() | |
| task.logger.report_matplotlib_figure(title="Confusion Matrix", series="test", figure=fig, iteration=0) | |
| plt.close(fig) | |
| if num_classes == 2: | |
| fpr, tpr, _ = roc_curve(test_labels_np, test_probs[:, 1]) | |
| fig_r, ax_r = plt.subplots(figsize=(6, 5)) | |
| ax_r.plot(fpr, tpr, label=f"ROC-AUC = {test_auc:.4f}") | |
| ax_r.plot([0, 1], [0, 1], "k--", lw=1) | |
| ax_r.set_xlabel("False positive rate") | |
| ax_r.set_ylabel("True positive rate") | |
| ax_r.set_title("Test ROC (MLP)") | |
| ax_r.legend(loc="lower right") | |
| fig_r.tight_layout() | |
| task.logger.report_matplotlib_figure( | |
| title="ROC", series="test", figure=fig_r, iteration=0 | |
| ) | |
| plt.close(fig_r) | |
| task.logger.flush() | |
| if __name__ == "__main__": | |
| main() | |