final_test / models /mlp /train.py
Abdelrahman Almatrooshi
Deploy snapshot from main b7a59b11809483dfc959f196f1930240f2662c49
22a6915
import csv
import json
import os
import random
import sys
import numpy as np
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import (
classification_report,
confusion_matrix,
f1_score,
precision_recall_fscore_support,
roc_auc_score,
roc_curve,
)
from data_preparation.prepare_dataset import get_dataloaders, SELECTED_FEATURES
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
USE_CLEARML = os.environ.get("USE_CLEARML", "0") == "1" or bool(os.environ.get("CLEARML_TASK_ID"))
CLEARML_QUEUE = os.environ.get("CLEARML_QUEUE", "")
def _load_cfg():
"""Build training config from config/default.yaml with fallbacks."""
try:
from config import get
mlp = get("mlp") or {}
data = get("data") or {}
ratios = data.get("split_ratios", [0.7, 0.15, 0.15])
return {
"model_name": mlp.get("model_name", "face_orientation"),
"epochs": mlp.get("epochs", 30),
"batch_size": mlp.get("batch_size", 32),
"lr": mlp.get("lr", 1e-3),
"seed": mlp.get("seed", 42),
"split_ratios": tuple(ratios),
"hidden_sizes": mlp.get("hidden_sizes", [64, 32]),
"checkpoints_dir": os.path.join(_PROJECT_ROOT, "checkpoints"),
"logs_dir": os.path.join(_PROJECT_ROOT, "evaluation", "logs"),
}
except Exception:
return {
"model_name": "face_orientation",
"epochs": 30,
"batch_size": 32,
"lr": 1e-3,
"seed": 42,
"split_ratios": (0.7, 0.15, 0.15),
"hidden_sizes": [64, 32],
"checkpoints_dir": os.path.join(_PROJECT_ROOT, "checkpoints"),
"logs_dir": os.path.join(_PROJECT_ROOT, "evaluation", "logs"),
}
CFG = _load_cfg()
# ==== ClearML: expose all config as task params, support remote execution ====
task = None
if USE_CLEARML:
try:
from clearml import Task
from config import CLEARML_PROJECT_NAME, flatten_for_clearml
task = Task.init(
project_name=CLEARML_PROJECT_NAME,
task_name="MLP Model Training",
tags=["training", "mlp_model"],
)
from config.clearml_enrich import enrich_task, upload_repro_artifacts
enrich_task(task, role="train_mlp")
flat = flatten_for_clearml()
flat["mlp/model_name"] = CFG.get("model_name", "face_orientation")
flat["mlp/epochs"] = CFG.get("epochs", 30)
flat["mlp/batch_size"] = CFG.get("batch_size", 32)
flat["mlp/lr"] = CFG.get("lr", 1e-3)
flat["mlp/seed"] = CFG.get("seed", 42)
flat["mlp/hidden_sizes"] = str(CFG.get("hidden_sizes", [64, 32]))
flat["mlp/split_ratios"] = str(CFG.get("split_ratios", (0.7, 0.15, 0.15)))
task.connect(flat)
upload_repro_artifacts(task)
if CLEARML_QUEUE:
print(f"[ClearML] Enqueuing to queue '{CLEARML_QUEUE}'. Agent will run training.")
task.execute_remotely(queue_name=CLEARML_QUEUE)
sys.exit(0)
except ImportError:
task = None
USE_CLEARML = False
# ==== Model =============================================
def set_seed(seed: int) -> None:
"""Set random seed for numpy, torch, and Python RNG for reproducibility."""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
class BaseModel(nn.Module):
"""MLP classifier: num_features -> hidden_sizes -> num_classes. Used for face_orientation focus."""
def __init__(self, num_features: int, num_classes: int, hidden_sizes: list[int] | None = None):
super().__init__()
sizes = hidden_sizes or CFG.get("hidden_sizes", [64, 32])
layers = []
prev = num_features
for h in sizes:
layers.extend([nn.Linear(prev, h), nn.ReLU()])
prev = h
layers.append(nn.Linear(prev, num_classes))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
def training_step(self, loader, optimizer, criterion, device):
self.train()
total_loss = 0.0
correct = 0
total = 0
for features, labels in loader:
features, labels = features.to(device), labels.to(device)
optimizer.zero_grad()
outputs = self(features)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item() * features.size(0)
correct += (outputs.argmax(dim=1) == labels).sum().item()
total += features.size(0)
return total_loss / total, correct / total
@torch.no_grad()
def validation_step(self, loader, criterion, device):
self.eval()
total_loss = 0.0
correct = 0
total = 0
all_preds = []
all_labels = []
for features, labels in loader:
features, labels = features.to(device), labels.to(device)
outputs = self(features)
loss = criterion(outputs, labels)
total_loss += loss.item() * features.size(0)
preds = outputs.argmax(dim=1)
correct += (preds == labels).sum().item()
total += features.size(0)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
val_f1 = f1_score(np.array(all_labels), np.array(all_preds), average="weighted")
return total_loss / total, correct / total, val_f1
@torch.no_grad()
def test_step(self, loader, criterion, device):
self.eval()
total_loss = 0.0
correct = 0
total = 0
all_preds = []
all_labels = []
all_probs = []
for features, labels in loader:
features, labels = features.to(device), labels.to(device)
outputs = self(features)
loss = criterion(outputs, labels)
total_loss += loss.item() * features.size(0)
preds = outputs.argmax(dim=1)
correct += (preds == labels).sum().item()
total += features.size(0)
probs = torch.softmax(outputs, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
all_probs.extend(probs.cpu().numpy())
return total_loss / total, correct / total, np.array(all_probs), np.array(all_preds), np.array(all_labels)
def main() -> None:
"""Train MLP on face_orientation features, save best checkpoint and scaler to checkpoints/."""
set_seed(CFG["seed"])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[TRAIN] Device: {device}")
print(f"[TRAIN] Model: {CFG['model_name']}")
train_loader, val_loader, test_loader, num_features, num_classes, scaler = get_dataloaders(
model_name=CFG["model_name"],
batch_size=CFG["batch_size"],
split_ratios=CFG["split_ratios"],
seed=CFG["seed"],
)
model = BaseModel(num_features, num_classes, hidden_sizes=CFG.get("hidden_sizes")).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=CFG["lr"])
param_count = sum(p.numel() for p in model.parameters())
print(f"[TRAIN] Parameters: {param_count:,}")
ckpt_dir = CFG["checkpoints_dir"]
os.makedirs(ckpt_dir, exist_ok=True)
best_ckpt_path = os.path.join(ckpt_dir, "mlp_best.pt")
history = {
"model_name": CFG["model_name"],
"param_count": param_count,
"epochs": [],
"train_loss": [],
"train_acc": [],
"val_loss": [],
"val_acc": [],
"val_f1": [],
}
best_val_f1 = 0.0
best_val_acc = 0.0
print(f"\n{'Epoch':>6} | {'Train Loss':>10} | {'Train Acc':>9} | {'Val Loss':>10} | {'Val Acc':>9} | {'Val F1':>8}")
print("-" * 72)
for epoch in range(1, CFG["epochs"] + 1):
train_loss, train_acc = model.training_step(train_loader, optimizer, criterion, device)
val_loss, val_acc, val_f1 = model.validation_step(val_loader, criterion, device)
history["epochs"].append(epoch)
history["train_loss"].append(round(train_loss, 4))
history["train_acc"].append(round(train_acc, 4))
history["val_loss"].append(round(val_loss, 4))
history["val_acc"].append(round(val_acc, 4))
history["val_f1"].append(round(val_f1, 4))
current_lr = optimizer.param_groups[0]['lr']
if task is not None:
task.logger.report_scalar("Loss", "Train", float(train_loss), iteration=epoch)
task.logger.report_scalar("Accuracy", "Train", float(train_acc), iteration=epoch)
task.logger.report_scalar("Loss", "Val", float(val_loss), iteration=epoch)
task.logger.report_scalar("Accuracy", "Val", float(val_acc), iteration=epoch)
task.logger.report_scalar("F1", "Val", float(val_f1), iteration=epoch)
task.logger.report_scalar("Learning Rate", "LR", float(current_lr), iteration=epoch)
task.logger.flush()
marker = ""
if val_f1 > best_val_f1:
best_val_f1 = val_f1
best_val_acc = val_acc
torch.save(model.state_dict(), best_ckpt_path)
marker = " *"
print(
f"{epoch:>6} | {train_loss:>10.4f} | {train_acc:>8.2%} | {val_loss:>10.4f} | "
f"{val_acc:>8.2%} | {val_f1:>8.4f}{marker}"
)
print(f"\nBest validation F1: {best_val_f1:.4f} (accuracy at best F1: {best_val_acc:.2%})")
print(f"Checkpoint saved to: {best_ckpt_path}")
model.load_state_dict(torch.load(best_ckpt_path, weights_only=True))
test_loss, test_acc, test_probs, test_preds, test_labels = model.test_step(test_loader, criterion, device)
test_labels_np = np.asarray(test_labels)
test_preds_np = np.asarray(test_preds)
test_f1 = f1_score(test_labels_np, test_preds_np, average="weighted")
if num_classes > 2:
test_auc = roc_auc_score(test_labels_np, test_probs, multi_class="ovr", average="weighted")
else:
test_auc = roc_auc_score(test_labels_np, test_probs[:, 1])
print(f"\n[TEST] Loss: {test_loss:.4f} | Accuracy: {test_acc:.2%}")
print(f"[TEST] F1: {test_f1:.4f} | ROC-AUC: {test_auc:.4f}")
history["test_loss"] = round(test_loss, 4)
history["test_acc"] = round(test_acc, 4)
history["test_f1"] = round(test_f1, 4)
history["test_auc"] = round(test_auc, 4)
# Dataset stats for ClearML
train_labels = train_loader.dataset.labels.numpy()
val_labels = val_loader.dataset.labels.numpy()
dataset_stats = {
"train_size": len(train_loader.dataset),
"val_size": len(val_loader.dataset),
"test_size": len(test_loader.dataset),
"train_class_counts": np.bincount(train_labels, minlength=num_classes).tolist(),
"val_class_counts": np.bincount(val_labels, minlength=num_classes).tolist(),
"test_class_counts": np.bincount(test_labels_np, minlength=num_classes).tolist(),
}
history["dataset_stats"] = dataset_stats
logs_dir = CFG["logs_dir"]
os.makedirs(logs_dir, exist_ok=True)
log_path = os.path.join(logs_dir, f"{CFG['model_name']}_training_log.json")
with open(log_path, "w") as f:
json.dump(history, f, indent=2)
print(f"[LOG] Training history saved to: {log_path}")
scaler_path = os.path.join(ckpt_dir, "scaler_mlp.joblib")
joblib.dump(scaler, scaler_path)
meta_path = os.path.join(ckpt_dir, "meta_mlp.npz")
np.savez(meta_path, feature_names=np.array(SELECTED_FEATURES["face_orientation"]))
print(f"[LOG] Scaler and meta saved to {ckpt_dir}")
cm = confusion_matrix(test_labels_np, test_preds_np)
pred_csv = os.path.join(logs_dir, f"{CFG['model_name']}_test_predictions.csv")
with open(pred_csv, "w", newline="") as f:
w = csv.writer(f)
w.writerow(["y_true", "y_pred"] + [f"prob_{j}" for j in range(num_classes)])
for i in range(len(test_labels_np)):
w.writerow(
[int(test_labels_np[i]), int(test_preds_np[i])]
+ [float(x) for x in test_probs[i]]
)
summary_path = os.path.join(logs_dir, f"{CFG['model_name']}_test_metrics_summary.json")
with open(summary_path, "w", encoding="utf-8") as f:
json.dump(
{
"model": "mlp",
"model_name": CFG["model_name"],
"checkpoint": os.path.basename(best_ckpt_path),
"test_loss": history["test_loss"],
"test_accuracy": history["test_acc"],
"test_f1_weighted": history["test_f1"],
"test_roc_auc": history["test_auc"],
"confusion_matrix": cm.tolist(),
"classification_report": classification_report(
test_labels_np, test_preds_np, digits=4
),
},
f,
indent=2,
)
print(f"[LOG] Test predictions → {pred_csv}")
# ClearML: artifacts, confusion matrix, per-class metrics, registered model
if task is not None:
from clearml import OutputModel
from config.clearml_enrich import attach_output_metrics, task_done_summary
task.upload_artifact(name="mlp_checkpoint", artifact_object=best_ckpt_path)
task.upload_artifact(name="training_log", artifact_object=log_path)
task.upload_artifact(name="test_predictions", artifact_object=pred_csv)
task.upload_artifact(name="test_metrics_summary", artifact_object=summary_path)
task.upload_artifact(name="scaler_mlp", artifact_object=scaler_path)
task.upload_artifact(name="meta_mlp", artifact_object=meta_path)
out_model = OutputModel(
task=task, name=f"MLP_{CFG['model_name']}", framework="PyTorch"
)
out_model.update_weights(
weights_filename=best_ckpt_path, auto_delete_file=False
)
attach_output_metrics(
out_model,
{
"test_accuracy": round(float(test_acc), 6),
"test_f1_weighted": round(float(test_f1), 6),
"test_roc_auc": round(float(test_auc), 6),
},
)
task_done_summary(
task,
f"MLP {CFG['model_name']}: test acc={test_acc:.4f}, F1={test_f1:.4f}, ROC-AUC={test_auc:.4f}",
)
task.logger.report_single_value("test/accuracy", test_acc)
task.logger.report_single_value("test/f1_weighted", test_f1)
task.logger.report_single_value("test/roc_auc", test_auc)
for key, val in dataset_stats.items():
if isinstance(val, list):
for i, v in enumerate(val):
task.logger.report_single_value(f"dataset/{key}/{i}", float(v))
else:
task.logger.report_single_value(f"dataset/{key}", float(val))
prec, rec, f1_per_class, _ = precision_recall_fscore_support(
test_labels_np, test_preds_np, average=None, zero_division=0
)
for c in range(num_classes):
task.logger.report_single_value(f"test/class_{c}_precision", float(prec[c]))
task.logger.report_single_value(f"test/class_{c}_recall", float(rec[c]))
task.logger.report_single_value(f"test/class_{c}_f1", float(f1_per_class[c]))
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(6, 5))
ax.imshow(cm, cmap="Blues")
ax.set_xticks(range(num_classes))
ax.set_yticks(range(num_classes))
ax.set_xticklabels([f"Class {i}" for i in range(num_classes)])
ax.set_yticklabels([f"Class {i}" for i in range(num_classes)])
for i in range(num_classes):
for j in range(num_classes):
ax.text(j, i, str(cm[i, j]), ha="center", va="center", color="black")
ax.set_xlabel("Predicted")
ax.set_ylabel("True")
ax.set_title("Test set confusion matrix")
fig.tight_layout()
task.logger.report_matplotlib_figure(title="Confusion Matrix", series="test", figure=fig, iteration=0)
plt.close(fig)
if num_classes == 2:
fpr, tpr, _ = roc_curve(test_labels_np, test_probs[:, 1])
fig_r, ax_r = plt.subplots(figsize=(6, 5))
ax_r.plot(fpr, tpr, label=f"ROC-AUC = {test_auc:.4f}")
ax_r.plot([0, 1], [0, 1], "k--", lw=1)
ax_r.set_xlabel("False positive rate")
ax_r.set_ylabel("True positive rate")
ax_r.set_title("Test ROC (MLP)")
ax_r.legend(loc="lower right")
fig_r.tight_layout()
task.logger.report_matplotlib_figure(
title="ROC", series="test", figure=fig_r, iteration=0
)
plt.close(fig_r)
task.logger.flush()
if __name__ == "__main__":
main()