| from utils import calculate_metrics, get_classes, CLASSES |
| from sklearn.ensemble import RandomForestRegressor |
| from sklearn.utils import shuffle |
| from typing import List, Tuple |
| from sklearn import metrics |
| from tqdm import tqdm |
| import pandas as pd |
| import numpy as np |
| import joblib |
| import pywt |
| import fire |
| import json |
| import os |
|
|
|
|
| TRAIN_SIZE = 1732 |
| TEST_SIZE = 1154 |
| TRAIN_DIR = "train_data_simulated/" |
| TEST_DIR = "test_data_simulated/" |
|
|
|
|
| def load_data() -> Tuple[List, pd.DataFrame, List, pd.DataFrame]: |
| X_train = [os.path.join(TRAIN_DIR, f"{i}.npz") for i in range(TRAIN_SIZE)] |
| X_test = [os.path.join(TEST_DIR, f"{i}.npz") for i in range(TEST_SIZE)] |
| y_train = pd.read_csv("train_gt.csv") |
| y_test = pd.read_csv("test_gt.csv") |
| return X_train, y_train, X_test, y_test |
|
|
|
|
| class SpectralCurveFiltering: |
| def __init__(self, merge_function=np.mean): |
| self.merge_function = merge_function |
|
|
| def __call__(self, sample: np.ndarray) -> np.ndarray: |
| return self.merge_function(sample, axis=(1, 2)) |
|
|
|
|
| class BaselineRegressor: |
| def __init__(self): |
| self.mean = 0 |
|
|
| def fit(self, X_train: np.ndarray, y_train: np.ndarray): |
| self.mean = np.mean(y_train, axis=0) |
| self.classes_count = y_train.shape[1] |
| return self |
|
|
| def predict(self, X_test: np.ndarray) -> np.ndarray: |
| return np.full((len(X_test), self.classes_count), self.mean) |
|
|
|
|
| def preprocess(samples_lst: List[str], features: List[str]) -> Tuple: |
| def _shape_pad(data: np.ndarray) -> np.ndarray: |
| """ |
| This sub-function makes padding to have square fields sizes. |
| Not mandatory but eliminates the risk of calculation error |
| in singular value decomposition. |
| Padding by warping also improves the performance slightly. |
| """ |
| max_edge = np.max(data.shape[1:]) |
| shape = (max_edge, max_edge) |
| padded = np.pad( |
| data, |
| ((0, 0), (0, (shape[0] - data.shape[1])), (0, (shape[1] - data.shape[2]))), |
| "wrap", |
| ) |
| return padded |
|
|
| filtering = SpectralCurveFiltering() |
| w1 = pywt.Wavelet("sym3") |
| w2 = pywt.Wavelet("dmey") |
|
|
| all_feature_names = [] |
|
|
| for sample_index, sample_path in tqdm( |
| enumerate(samples_lst), total=len(samples_lst) |
| ): |
| with np.load(sample_path) as npz: |
| data = np.ma.MaskedArray(**npz) |
| data = _shape_pad(data) |
| |
| s = np.linalg.svd(data, full_matrices=False, compute_uv=False) |
| s0 = s[:, 0] |
| s1 = s[:, 1] |
| s2 = s[:, 2] |
| s3 = s[:, 3] |
| s4 = s[:, 4] |
| dXds1 = s0 / (s1 + np.finfo(float).eps) |
| ffts = np.fft.fft(s0) |
| reals = np.real(ffts) |
| imags = np.imag(ffts) |
|
|
| |
| data = filtering(data) |
|
|
| cA0, cD0 = pywt.dwt(data, wavelet=w2, mode="constant") |
| cAx, cDx = pywt.dwt(cA0[12:92], wavelet=w2, mode="constant") |
| cAy, cDy = pywt.dwt(cAx[15:55], wavelet=w2, mode="constant") |
| cAz, cDz = pywt.dwt(cAy[15:35], wavelet=w2, mode="constant") |
| cAw2 = np.concatenate((cA0[12:92], cAx[15:55], cAy[15:35], cAz[15:25]), -1) |
| cDw2 = np.concatenate((cD0[12:92], cDx[15:55], cDy[15:35], cDz[15:25]), -1) |
|
|
| cA0, cD0 = pywt.dwt(data, wavelet=w1, mode="constant") |
| cAx, cDx = pywt.dwt(cA0[1:-1], wavelet=w1, mode="constant") |
| cAy, cDy = pywt.dwt(cAx[1:-1], wavelet=w1, mode="constant") |
| cAz, cDz = pywt.dwt(cAy[1:-1], wavelet=w1, mode="constant") |
| cAw1 = np.concatenate((cA0, cAx, cAy, cAz), -1) |
| cDw1 = np.concatenate((cD0, cDx, cDy, cDz), -1) |
|
|
| dXdl = np.gradient(data, axis=0) |
| d2Xdl2 = np.gradient(dXdl, axis=0) |
| d3Xdl3 = np.gradient(d2Xdl2, axis=0) |
|
|
| fft = np.fft.fft(data) |
| real = np.real(fft) |
| imag = np.imag(fft) |
|
|
| features_to_select = { |
| "spatial": (dXds1, s0, s1, s2, s3, s4, reals, imags), |
| "fft": (real, imag), |
| "gradient": (dXdl, d2Xdl2, d3Xdl3), |
| "mean": (data,), |
| "dwt": (cAw1, cAw2), |
| } |
|
|
| |
| sample_features = [] |
| sample_feature_names = [] |
| for feature_name in features: |
| sample_features.extend(features_to_select[feature_name]) |
| sample_feature_names.extend( |
| [feature_name] |
| * len(np.concatenate(features_to_select[feature_name])) |
| ) |
|
|
| sample_features = np.concatenate(sample_features, -1) |
| samples_lst[sample_index] = sample_features |
| all_feature_names.append(sample_feature_names) |
|
|
| return np.vstack(samples_lst), all_feature_names |
|
|
|
|
| def runner(features: List[str] = "spatial,fft,dwt,gradient,mean".split(",")): |
| X_train, y_train, X_test, y_test = load_data() |
|
|
| X_train, train_feature_names = preprocess(X_train, features) |
| X_test, test_feature_names = preprocess(X_test, features) |
|
|
| X_train, y_train = shuffle(X_train, y_train, random_state=2023) |
|
|
| model = RandomForestRegressor(random_state=2023) |
| print(f"Training model on {X_train.shape} features...") |
| model = model.fit(X_train, y_train[CLASSES].values) |
|
|
| joblib.dump(model, f"RF_model_{'-'.join(features)}.joblib") |
|
|
| submission_df = pd.DataFrame(data=model.predict(X_test), columns=CLASSES) |
| submission_df.to_csv(",".join(features) + ".csv", index_label="sample_index") |
|
|
| baseline_reg = BaselineRegressor() |
| baseline_reg = baseline_reg.fit(X_train, y_train[CLASSES].values) |
| baselines_mse = np.mean( |
| (y_test[CLASSES].values - baseline_reg.predict(X_test)) ** 2, axis=0 |
| ) |
|
|
| mse = np.mean((y_test[CLASSES].values - submission_df[CLASSES].values) ** 2, axis=0) |
| scores = mse / baselines_mse |
| final_score = np.mean(scores) |
|
|
| r2 = metrics.r2_score( |
| y_true=y_test[CLASSES].values, |
| y_pred=submission_df[CLASSES].values, |
| multioutput="raw_values", |
| ) |
| mse = metrics.mean_squared_error( |
| y_true=y_test[CLASSES].values, |
| y_pred=submission_df[CLASSES].values, |
| multioutput="raw_values", |
| ) |
| mae = metrics.mean_absolute_error( |
| y_true=y_test[CLASSES].values, |
| y_pred=submission_df[CLASSES].values, |
| multioutput="raw_values", |
| ) |
| all_metrics = calculate_metrics( |
| y_pred=get_classes(submission_df[CLASSES]), |
| y_true=get_classes(y_test[CLASSES]), |
| ) |
| mse = {k + "_mse": v for k, v in zip(["P", "K", "Mg", "pH"], mse.tolist())} |
| r2 = {k + "_r2": v for k, v in zip(["P", "K", "Mg", "pH"], r2.tolist())} |
| mae = {k + "_mae": v for k, v in zip(["P", "K", "Mg", "pH"], mae.tolist())} |
|
|
| all_metrics["custom"] = final_score |
| all_metrics = pd.DataFrame.from_dict({**all_metrics, **r2, **mse, **mae}) |
| all_metrics.to_csv(f"all_metrics.csv", index=False) |
|
|
| with open("all_metrics.json", "w", encoding="utf-8") as f: |
| json.dump(all_metrics.to_dict(), f, ensure_ascii=True, indent=4) |
|
|
| print(f"Custom score: {final_score}") |
| return final_score |
|
|
|
|
| if __name__ == "__main__": |
| fire.Fire(runner) |
| model = joblib.load( |
| f"RF_model_{'-'.join('spatial,fft,dwt,gradient,mean'.split(','))}.joblib" |
| ) |
| import sklearn |
|
|
| assert isinstance(model, sklearn.ensemble._forest.RandomForestRegressor) |
|
|