"""Export XGBoost sweep trial metrics from ClearML to CSV. This helper is intended for the local Optuna workflow in sweep_local.py, where each trial logs val_loss / val_f1 / val_accuracy as ClearML scalars. """ from __future__ import annotations import argparse import csv from pathlib import Path from typing import Optional from clearml import Task from sklearn.metrics import accuracy_score from xgboost import XGBClassifier from data_preparation.prepare_dataset import get_default_split_config, get_numpy_splits DEFAULT_PROJECT = "FocusGuards Large Group Project" DEFAULT_TAGS = ["xgboost", "optuna_manual"] DEFAULT_OUTPUT = Path("models/xgboost/sweep_results_all_40.csv") DEFAULT_NAME_PREFIX = "XGBoost Sweep Trial #" DEFAULT_LIMIT = 40 DEFAULT_SORT_BY = "val_f1" def _read_metric(metrics: dict, title: str, series: str) -> Optional[float]: raw_value = metrics.get(title, {}).get(series, {}).get("last") if raw_value is None: return None try: return float(raw_value) except (TypeError, ValueError): return None def _to_float(value, default: float = 0.0) -> float: try: return float(value) except (TypeError, ValueError): return default def _task_time_key(task: Task) -> str: # Prefer last update time when available so --limit keeps the latest trials. data = getattr(task, "data", None) if data is not None: for attr in ("last_update", "last_iteration_timestamp", "created"): value = getattr(data, attr, None) if value: return str(value) for attr in ("last_update", "created"): value = getattr(task, attr, None) if value: return str(value) return "" def _is_valid_core_metric(value: Optional[float]) -> bool: return value is not None and value > 0.0 def _sort_metric(value: Optional[float], default: float) -> float: return value if value is not None else default def _compute_missing_val_accuracy(rows: list[dict], seed: int) -> int: need_backfill = [r for r in rows if r["val_accuracy"] is None] if not need_backfill: return 0 split_ratios, _default_seed = get_default_split_config() try: splits, _num_features, _num_classes, _scaler = get_numpy_splits( model_name="face_orientation", split_ratios=split_ratios, seed=seed, scale=False, ) except Exception as exc: print(f"[FETCH] WARNING: Could not backfill val_accuracy (dataset unavailable): {exc}") return 0 X_train, y_train = splits["X_train"], splits["y_train"] X_val, y_val = splits["X_val"], splits["y_val"] computed = 0 for row in need_backfill: try: params = { "n_estimators": int(row["n_estimators"]), "max_depth": int(row["max_depth"]), "learning_rate": float(row["learning_rate"]), "subsample": float(row["subsample"]), "colsample_bytree": float(row["colsample_bytree"]), "reg_alpha": float(row["reg_alpha"]), "reg_lambda": float(row["reg_lambda"]), "eval_metric": "logloss", "random_state": seed, "verbosity": 0, } model = XGBClassifier(**params) model.fit(X_train, y_train) val_preds = model.predict(X_val) row["val_accuracy"] = float(accuracy_score(y_val, val_preds)) computed += 1 except Exception as exc: print(f"[FETCH] WARNING: Failed val_accuracy backfill for task_id={row['task_id']}: {exc}") return computed def _sort_key(row: dict, sort_by: str) -> tuple[float, float, float]: val_loss = _sort_metric(row["val_loss"], float("inf")) val_f1 = _sort_metric(row["val_f1"], float("-inf")) val_accuracy = _sort_metric(row["val_accuracy"], float("-inf")) if sort_by == "val_loss": return (val_loss, -val_f1, -val_accuracy) if sort_by == "val_accuracy": return (-val_accuracy, -val_f1, val_loss) return (-val_f1, val_loss, -val_accuracy) def fetch_rows( project_name: str, tags: list[str], name_prefix: str, limit: int, drop_zero_metrics: bool, sort_by: str, compute_missing_val_accuracy: bool, seed: int, ) -> list[dict]: print( f"[FETCH] Project={project_name} Tags={tags} " f"NamePrefix={name_prefix!r} Limit={limit}" ) tasks = Task.get_tasks( project_name=project_name, tags=tags, task_filter={"status": ["completed"]}, ) filtered_tasks = [t for t in tasks if (t.name or "").startswith(name_prefix)] filtered_tasks.sort(key=_task_time_key, reverse=True) if limit > 0: filtered_tasks = filtered_tasks[:limit] print( f"[FETCH] Total completed tagged tasks={len(tasks)} | " f"name-matched={len(filtered_tasks)}" ) rows = [] for task in filtered_tasks: params = task.get_parameters() or {} metrics = task.get_last_scalar_metrics() or {} val_loss = _read_metric(metrics, "Loss", "Val") val_accuracy = _read_metric(metrics, "Summary", "val_accuracy") val_f1 = _read_metric(metrics, "Summary", "val_f1") row = { "task_id": task.id, "val_loss": val_loss, "val_accuracy": val_accuracy, "val_f1": val_f1, "n_estimators": _to_float(params.get("General/n_estimators", params.get("n_estimators"))), "max_depth": _to_float(params.get("General/max_depth", params.get("max_depth"))), "learning_rate": _to_float(params.get("General/learning_rate", params.get("learning_rate"))), "subsample": _to_float(params.get("General/subsample", params.get("subsample"))), "colsample_bytree": _to_float( params.get("General/colsample_bytree", params.get("colsample_bytree")) ), "reg_alpha": _to_float(params.get("General/reg_alpha", params.get("reg_alpha"))), "reg_lambda": _to_float(params.get("General/reg_lambda", params.get("reg_lambda"))), } rows.append(row) if compute_missing_val_accuracy: computed = _compute_missing_val_accuracy(rows, seed=seed) print(f"[FETCH] Backfilled val_accuracy for {computed} rows where it was missing") if drop_zero_metrics: before = len(rows) rows = [ r for r in rows if ( _is_valid_core_metric(r["val_loss"]) and _is_valid_core_metric(r["val_accuracy"]) and _is_valid_core_metric(r["val_f1"]) ) ] print(f"[FETCH] Skipped tasks with missing/zero core metrics: {before - len(rows)}") # Default ranking is by validation F1, then val_loss, then val_accuracy. rows.sort(key=lambda r: _sort_key(r, sort_by=sort_by)) return rows def write_csv(rows: list[dict], output_path: Path) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) fieldnames = [ "task_id", "val_loss", "val_accuracy", "val_f1", "n_estimators", "max_depth", "learning_rate", "subsample", "colsample_bytree", "reg_alpha", "reg_lambda", ] with output_path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) print(f"[FETCH] Wrote {len(rows)} rows to {output_path}") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--project", default=DEFAULT_PROJECT, help="ClearML project name") parser.add_argument( "--tags", nargs="+", default=DEFAULT_TAGS, help="ClearML task tags to filter (default: xgboost optuna_manual)", ) parser.add_argument( "--output", default=str(DEFAULT_OUTPUT), help="Output CSV path", ) parser.add_argument( "--name-prefix", default=DEFAULT_NAME_PREFIX, help="Only include tasks whose name starts with this prefix", ) parser.add_argument( "--limit", type=int, default=DEFAULT_LIMIT, help="Max number of latest matching tasks to inspect; <=0 means no limit", ) parser.add_argument( "--keep-zero-metrics", action="store_true", help="Keep tasks even when val_loss/val_accuracy/val_f1 are missing or zero", ) parser.add_argument( "--sort-by", choices=["val_f1", "val_loss", "val_accuracy"], default=DEFAULT_SORT_BY, help="Primary ranking metric for exported rows (default: val_f1)", ) parser.add_argument( "--compute-missing-val-accuracy", action="store_true", help="Train per-row models to backfill val_accuracy only when it is missing", ) parser.add_argument( "--seed", type=int, default=42, help="Random seed used when backfilling missing val_accuracy", ) return parser.parse_args() def main() -> None: args = parse_args() rows = fetch_rows( project_name=args.project, tags=args.tags, name_prefix=args.name_prefix, limit=args.limit, drop_zero_metrics=not args.keep_zero_metrics, sort_by=args.sort_by, compute_missing_val_accuracy=args.compute_missing_val_accuracy, seed=args.seed, ) write_csv(rows, output_path=Path(args.output)) if __name__ == "__main__": main()