| """ |
| Gradio app for NSL-KDD binary intrusion detection demo (MVP) |
| Expecting these files in the same repo/root of the Space: |
| - nsl_kdd_tf_model.h5 (optional; if present will be used) |
| - scaler.pkl (optional; sklearn StandardScaler, must match model training) |
| - columns.json (optional; list of feature column names used by the model) |
| |
| If artifacts are missing, the app will instruct you how to add them and offers a quick fallback |
| where you can upload a CSV and the app will train a lightweight sklearn model for demo purposes. |
| """ |
|
|
| import os |
| import json |
| import tempfile |
| import traceback |
| from typing import Tuple, List |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| import gradio as gr |
|
|
| |
| TF_AVAILABLE = True |
| try: |
| import tensorflow as tf |
| except Exception: |
| TF_AVAILABLE = False |
|
|
| from sklearn.preprocessing import StandardScaler |
| from sklearn.linear_model import LogisticRegression |
| import joblib |
|
|
| |
| MODEL_FILE = "nsl_kdd_tf_model.h5" |
| SCALER_FILE = "scaler.pkl" |
| COLUMNS_FILE = "columns.json" |
|
|
| |
| def load_artifacts(): |
| model = None |
| scaler = None |
| columns = None |
| model_type = None |
|
|
| |
| if os.path.exists(COLUMNS_FILE): |
| with open(COLUMNS_FILE, "r", encoding="utf-8") as f: |
| columns = json.load(f) |
|
|
| |
| if os.path.exists(SCALER_FILE): |
| try: |
| scaler = joblib.load(SCALER_FILE) |
| except Exception: |
| try: |
| scaler = joblib.load(open(SCALER_FILE, "rb")) |
| except Exception: |
| scaler = None |
|
|
| |
| if os.path.exists(MODEL_FILE) and TF_AVAILABLE: |
| try: |
| model = tf.keras.models.load_model(MODEL_FILE) |
| model_type = "tensorflow" |
| except Exception: |
| model = None |
|
|
| return model, scaler, columns, model_type |
|
|
| MODEL, SCALER, COLUMNS, MODEL_TYPE = load_artifacts() |
|
|
| def model_available_message() -> str: |
| if MODEL is not None and SCALER is not None and COLUMNS is not None: |
| return "✅ Pretrained TensorFlow model and artifacts loaded. Ready to predict." |
| pieces = [] |
| if MODEL is None: |
| pieces.append(f"Missing `{MODEL_FILE}`") |
| if SCALER is None: |
| pieces.append(f"Missing `{SCALER_FILE}`") |
| if COLUMNS is None: |
| pieces.append(f"Missing `{COLUMNS_FILE}`") |
| msg = "⚠️ Artifacts missing: " + ", ".join(pieces) + ".\n\n" |
| msg += "To run the TF model, add those files to the Space repository (same folder as app.py).\n" |
| msg += "Alternatively, upload a CSV of NSL-KDD records (the app will train a quick sklearn model for demo).\n\n" |
| msg += "columns.json should be a JSON array of feature names that match the model input (same as X_train.columns).\n" |
| return msg |
|
|
| |
| def prepare_X_from_df(df: pd.DataFrame, expected_columns: List[str], scaler_obj) -> np.ndarray: |
| |
| X = df.reindex(columns=expected_columns, fill_value=0) |
| |
| X = X.apply(pd.to_numeric, errors="coerce").fillna(0.0) |
| if scaler_obj is not None: |
| Xs = scaler_obj.transform(X) |
| else: |
| |
| Xs = X.values.astype(np.float32) |
| return Xs |
|
|
| def predict_batch_from_df(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]: |
| """ |
| returns (result_df, status_message) |
| result_df contains prob and predicted class per row |
| """ |
| try: |
| if MODEL is not None and SCALER is not None and COLUMNS is not None and MODEL_TYPE == "tensorflow": |
| Xs = prepare_X_from_df(df, COLUMNS, SCALER) |
| probs = MODEL.predict(Xs).ravel() |
| preds = (probs >= 0.5).astype(int) |
| out = df.copy() |
| out["_pred_prob"] = probs |
| out["_pred_class"] = preds |
| return out, "Predictions from TensorFlow model" |
| else: |
| |
| if 'label' in df.columns or 'label_bin' in df.columns: |
| |
| |
| cats = ['protocol_type', 'service', 'flag'] |
| col_names = df.columns.tolist() |
| |
| num_cols = [c for c in col_names if c not in cats + ['label','label_bin']] |
| X_num = df[num_cols].apply(pd.to_numeric, errors='coerce').fillna(0.0) |
| X_cat = pd.get_dummies(df[cats], drop_first=True) |
| X = pd.concat([X_num, X_cat], axis=1) |
| y = df['label_bin'] if 'label_bin' in df.columns else df['label'].apply(lambda s: 0 if str(s).strip().lower()=="normal" else 1) |
| |
| scaler_local = StandardScaler() |
| Xs = scaler_local.fit_transform(X) |
| clf = LogisticRegression(max_iter=200) |
| clf.fit(Xs, y) |
| probs = clf.predict_proba(Xs)[:,1] |
| preds = (probs >= 0.5).astype(int) |
| out = df.copy() |
| out["_pred_prob"] = probs |
| out["_pred_class"] = preds |
| return out, "Trained temporary LogisticRegression on uploaded CSV (used 'label' or 'label_bin' for training)." |
| else: |
| return pd.DataFrame(), "Cannot fallback: artifacts missing and uploaded CSV does not contain 'label' or 'label_bin' to train a temporary model." |
| except Exception as e: |
| tb = traceback.format_exc() |
| return pd.DataFrame(), f"Prediction error: {e}\n\n{tb}" |
|
|
| def predict_single(sample_text: str) -> str: |
| """ |
| sample_text: CSV row or JSON dict representing one row with same columns as columns.json |
| returns a readable string with probability and class |
| """ |
| try: |
| if not sample_text: |
| return "No input provided." |
| |
| try: |
| d = json.loads(sample_text) |
| if isinstance(d, dict): |
| df = pd.DataFrame([d]) |
| else: |
| return "JSON must represent an object/dict for single sample." |
| except Exception: |
| |
| try: |
| df = pd.read_csv(pd.compat.StringIO(sample_text), header=None) |
| |
| if COLUMNS is not None and df.shape[1] == len(COLUMNS): |
| df.columns = COLUMNS |
| else: |
| return "CSV input detected but header/column count mismatch. Prefer JSON object keyed by column names." |
| except Exception: |
| return "Could not parse input. Paste a JSON object like {\"duration\":0, \"protocol_type\":\"tcp\", ...} or upload a CSV row with header." |
|
|
| |
| if MODEL is not None and SCALER is not None and COLUMNS is not None and MODEL_TYPE == "tensorflow": |
| Xs = prepare_X_from_df(df, COLUMNS, SCALER) |
| prob = float(MODEL.predict(Xs)[0,0]) |
| pred = int(prob >= 0.5) |
| return f"Pred prob: {prob:.4f} — predicted class: {pred} (0=normal, 1=attack)" |
| else: |
| return "Model artifacts not present in Space. Upload `nsl_kdd_tf_model.h5`, `scaler.pkl`, and `columns.json` to use the TensorFlow model. Alternatively upload a labelled CSV to train a quick demo model." |
| except Exception as e: |
| tb = traceback.format_exc() |
| return f"Error: {e}\n\n{tb}" |
|
|
| |
| with gr.Blocks(title="NSL-KDD Intrusion Detection — Demo MVP") as demo: |
| gr.Markdown("# NSL-KDD Intrusion Detection — Demo (MVP)\n" |
| "Upload your artifacts (`nsl_kdd_tf_model.h5`, `scaler.pkl`, `columns.json`) to the Space to use the TensorFlow model.\n" |
| "Or upload a labelled CSV (contains `label` or `label_bin`) and the app will train a quick logistic regression for demo.\n\n" |
| "Columns expected: the original notebook used 41 numeric features with one-hot for `protocol_type`, `service`, `flag`.\n" |
| ) |
| status = gr.Textbox(label="Status / Artifact check", value=model_available_message(), interactive=False) |
| with gr.Row(): |
| with gr.Column(scale=2): |
| file_input = gr.File(label="Upload CSV for batch prediction or for training fallback", file_types=['.csv']) |
| sample_input = gr.Textbox(label="Single-sample input (JSON object)", placeholder='{"duration":0, "protocol_type":"tcp", ...}', lines=6) |
| predict_button = gr.Button("Predict single sample") |
| batch_button = gr.Button("Run batch (on uploaded CSV)") |
|
|
| with gr.Column(scale=1): |
| out_table = gr.Dataframe(label="Batch predictions (if any)") |
|
|
| single_out = gr.Textbox(label="Single sample result", interactive=False) |
|
|
| |
| example_text = json.dumps({ |
| "duration": 0, |
| "protocol_type": "tcp", |
| "service": "http", |
| "flag": "SF", |
| "src_bytes": 181, |
| "dst_bytes": 5450 |
| }, indent=2) |
| gr.Markdown("**Example single-sample JSON (fill in more NSL-KDD fields if you have them):**") |
| gr.Code(example_text, language="json") |
|
|
| |
| def on_predict_single(sample_text): |
| return predict_single(sample_text) |
|
|
| def on_batch_predict(file_obj): |
| if file_obj is None: |
| return pd.DataFrame(), "No file uploaded." |
| try: |
| |
| df = pd.read_csv(file_obj.name) |
| except Exception: |
| try: |
| |
| df = pd.read_csv(file_obj) |
| except Exception as e: |
| return pd.DataFrame(), f"Could not read CSV: {e}" |
|
|
| out_df, msg = predict_batch_from_df(df) |
| if out_df.empty: |
| return pd.DataFrame(), msg |
| |
| display_df = out_df.copy() |
| |
| for c in ["_pred_prob", "_pred_class"]: |
| if c in display_df.columns: |
| cols = [c] + [x for x in display_df.columns if x != c] |
| display_df = display_df[cols] |
| return display_df, msg |
|
|
| predict_button.click(on_predict_single, inputs=[sample_input], outputs=[single_out]) |
| batch_button.click(on_batch_predict, inputs=[file_input], outputs=[out_table, status]) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |
|
|