| import pandas as pd
|
| import numpy as np
|
| from sklearn.preprocessing import StandardScaler, OneHotEncoder
|
| from sklearn.compose import ColumnTransformer
|
| from sklearn.pipeline import Pipeline
|
| from imblearn.over_sampling import SMOTE
|
| from sklearn.model_selection import train_test_split
|
| from sklearn import __version__ as sklearn_version
|
| from packaging import version
|
|
|
| class DataProcessor:
|
| def __init__(self):
|
| self.scaler = StandardScaler()
|
|
|
|
|
| if version.parse(sklearn_version) >= version.parse('1.2.0'):
|
| self.encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
|
| else:
|
| self.encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')
|
|
|
| def load_data(self, file_path):
|
| """Load the dataset from a CSV file"""
|
| try:
|
| df = pd.read_csv(file_path)
|
| return df
|
| except Exception as e:
|
| print(f"Error loading data: {e}")
|
| return None
|
|
|
| def preprocess_data(self, df, target_col='Class'):
|
| """Preprocess the data for model training"""
|
|
|
| df = df.fillna(df.mean())
|
|
|
|
|
| X = df.drop(columns=[target_col])
|
| y = df[target_col]
|
|
|
|
|
| X_train, X_test, y_train, y_test = train_test_split(
|
| X, y, test_size=0.2, random_state=42, stratify=y
|
| )
|
|
|
|
|
| num_features = X.select_dtypes(include=['int64', 'float64']).columns
|
|
|
|
|
| cat_features = X.select_dtypes(include=['object', 'category']).columns
|
|
|
|
|
| if version.parse(sklearn_version) >= version.parse('1.2.0'):
|
| preprocessor = ColumnTransformer(
|
| transformers=[
|
| ('num', StandardScaler(), num_features),
|
| ('cat', OneHotEncoder(sparse_output=False, handle_unknown='ignore'), cat_features)
|
| ] if len(cat_features) > 0 else [
|
| ('num', StandardScaler(), num_features)
|
| ]
|
| )
|
| else:
|
| preprocessor = ColumnTransformer(
|
| transformers=[
|
| ('num', StandardScaler(), num_features),
|
| ('cat', OneHotEncoder(sparse=False, handle_unknown='ignore'), cat_features)
|
| ] if len(cat_features) > 0 else [
|
| ('num', StandardScaler(), num_features)
|
| ]
|
| )
|
|
|
|
|
| X_train_processed = preprocessor.fit_transform(X_train)
|
| X_test_processed = preprocessor.transform(X_test)
|
|
|
|
|
| smote = SMOTE(random_state=42)
|
| X_train_resampled, y_train_resampled = smote.fit_resample(X_train_processed, y_train)
|
|
|
| return X_train_resampled, X_test_processed, y_train_resampled, y_test, preprocessor
|
|
|
| def engineer_features(self, df):
|
| """Create new features for fraud detection"""
|
|
|
| df_new = df.copy()
|
|
|
|
|
| if 'Time' in df_new.columns:
|
|
|
| df_new['Hour'] = (df_new['Time'] / 3600) % 24
|
|
|
|
|
| df_new['Odd_Hour'] = ((df_new['Hour'] >= 0) & (df_new['Hour'] < 5)).astype(int)
|
|
|
|
|
| if 'Amount' in df_new.columns:
|
|
|
| df_new['Log_Amount'] = np.log1p(df_new['Amount'])
|
|
|
|
|
| threshold = df_new['Amount'].quantile(0.95)
|
| df_new['High_Value'] = (df_new['Amount'] > threshold).astype(int)
|
|
|
|
|
| if 'card_id' in df_new.columns:
|
|
|
| tx_count = df_new.groupby('card_id').size().reset_index(name='Tx_Count')
|
| df_new = df_new.merge(tx_count, on='card_id', how='left')
|
|
|
|
|
| avg_amount = df_new.groupby('card_id')['Amount'].mean().reset_index(name='Avg_Amount')
|
| df_new = df_new.merge(avg_amount, on='card_id', how='left')
|
|
|
|
|
| df_new['Amount_Deviation'] = df_new['Amount'] - df_new['Avg_Amount']
|
|
|
| return df_new |