| |
| import numpy as np |
| import pandas as pd |
|
|
| import torch |
| from transformers import BertTokenizer |
| from sklearn.preprocessing import LabelEncoder |
| from sklearn.model_selection import train_test_split |
| import re |
|
|
| |
| def read_data(path): |
| try: |
| df = pd.read_csv(path) |
| if df.empty: |
| print("The file is empty.") |
| return None |
| return df |
| except FileNotFoundError: |
| print(f"File not found at: {path}") |
| return None |
| except Exception as e: |
| print(f"An error occurred: {e}") |
| return None |
|
|
| |
| data_path = r"E:\transactify\transactify\Dataset\transaction_data.csv" |
|
|
| |
| data = read_data(data_path) |
| if data is not None: |
| print("Data loaded successfully:") |
| print(data.head(15)) |
| else: |
| print("Data loading failed. Exiting...") |
| exit() |
|
|
| |
| def clean_text(text): |
| text = text.lower() |
| text = re.sub(r"\d+", " ", text) |
| text = re.sub(r"[^\w\s]", " ", text) |
| text = text.strip() |
| return text |
|
|
| |
| def preprocessing_data(df, max_length=20): |
| tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") |
| |
| input_ids = [] |
| attention_masks = [] |
| |
| |
| if "Transaction Description" not in df.columns or "Category" not in df.columns: |
| raise ValueError("The required columns 'Transaction Description' and 'Category' are missing from the dataset.") |
| |
| for description in df["Transaction Description"]: |
| cleaned_text = clean_text(description) |
| |
| |
| |
| |
| |
| |
| if cleaned_text: |
| encoded_dict = tokenizer.encode_plus( |
| cleaned_text, |
| add_special_tokens=True, |
| max_length=max_length, |
| pad_to_max_length=True, |
| return_attention_mask=True, |
| return_tensors="pt", |
| truncation=True |
| ) |
| |
| input_ids.append(encoded_dict['input_ids']) |
| attention_masks.append(encoded_dict['attention_mask']) |
| else: |
| print("Cleaned text is empty, skipping...") |
|
|
| |
| print(f"Total input_ids collected: {len(input_ids)}") |
| print(f"Total attention_masks collected: {len(attention_masks)}") |
| |
| if not input_ids: |
| raise ValueError("No input_ids were collected. Check the cleaning process.") |
|
|
| |
| input_ids = torch.cat(input_ids, dim=0) |
| attention_masks = torch.cat(attention_masks, dim=0) |
| |
| |
| labelencoder = LabelEncoder() |
| labels = labelencoder.fit_transform(df["Category"]) |
| labels = torch.tensor(labels, dtype=torch.long) |
| |
| return input_ids, attention_masks, labels, labelencoder |
|
|
| |
| def split_data(input_ids, attention_masks, labels, test_size=0.2, random_state=42): |
| X_train_ids, X_test_ids, y_train, y_test = train_test_split( |
| input_ids, labels, test_size=test_size, random_state=random_state |
| ) |
| |
| X_train_masks, X_test_masks = train_test_split( |
| attention_masks, test_size=test_size, random_state=random_state |
| ) |
| |
| return X_train_ids, X_test_ids, X_train_masks, X_test_masks, y_train, y_test |
|
|
| |
| input_ids, attention_masks, labels, labelencoder = preprocessing_data(data) |
| X_train_ids, X_test_ids, X_train_masks, X_test_masks, y_train, y_test = split_data(input_ids, attention_masks, labels) |
|
|
| |
| print(f"Training set size: {X_train_ids.shape[0]}") |
| print(f"Test set size: {X_test_ids.shape[0]}") |
|
|