| """The Sampeling class serve as a helper module for retriving subject model data""" |
| from abc import ABC, abstractmethod |
|
|
| import os |
| import gc |
| import time |
|
|
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| from singleVis.utils import * |
| |
| |
| |
| |
| |
|
|
| from scipy.special import softmax |
| import torch |
| from torch import nn |
| from torch.nn import functional as F |
|
|
| class VAE(nn.Module): |
| def __init__(self, input_dim, hidden_dim, latent_dim): |
| super(VAE, self).__init__() |
| self.fc1 = nn.Linear(input_dim, hidden_dim) |
| self.fc21 = nn.Linear(hidden_dim, latent_dim) |
| self.fc22 = nn.Linear(hidden_dim, latent_dim) |
| self.fc3 = nn.Linear(latent_dim, hidden_dim) |
| self.fc4 = nn.Linear(hidden_dim, input_dim) |
|
|
| def encode(self, x): |
| h1 = F.relu(self.fc1(x)) |
| return self.fc21(h1), self.fc22(h1) |
|
|
| def reparameterize(self, mu, logvar): |
| std = torch.exp(0.5*logvar) |
| eps = torch.randn_like(std) |
| return mu + eps*std |
|
|
| def decode(self, z): |
| h3 = F.relu(self.fc3(z)) |
| return torch.sigmoid(self.fc4(h3)) |
|
|
| def forward(self, x): |
| mu, logvar = self.encode(x.view(-1, 512)) |
| z = self.reparameterize(mu, logvar) |
| return self.decode(z), mu, logvar |
|
|
|
|
| """ |
| DataContainder module |
| 1. calculate information entropy for singel sample and subset |
| 2. sample informative subset |
| """ |
| class DataGenerationAbstractClass(ABC): |
| |
| def __init__(self, data_provider, epoch): |
| self.mode = "abstract" |
| self.data_provider = data_provider |
| |
| self.epoch = epoch |
| |
| |
| |
| |
|
|
| class DataGeneration(DataGenerationAbstractClass): |
| def __init__(self, model, data_provider, epoch, device): |
| self.data_provider = data_provider |
| self.model = model |
| |
| self.epoch = epoch |
| self.DEVICE = device |
| |
| def generate_adversarial_example(self,input_data, target,epsilon): |
| self.model.to(self.DEVICE) |
| self.model.eval() |
| |
| input_data.requires_grad = True |
|
|
| target = target.to(self.DEVICE) |
|
|
| |
| output = self.model(input_data) |
| loss_function = nn.CrossEntropyLoss() |
|
|
| target = target.expand(input_data.size(0)) |
| loss = loss_function(output, target) |
|
|
|
|
| """calculate the input data's graint of the loss function """ |
| self.model.zero_grad() |
| loss.backward() |
| gradient = input_data.grad.data |
|
|
| |
| adversarial_example = input_data + epsilon * gradient.sign() |
|
|
| return adversarial_example |
|
|
| def gen(self,epsilon=0.2,sample_ratio=0.1): |
| labels = self.data_provider.train_labels(self.epoch) |
| |
|
|
| training_data_path = os.path.join(self.data_provider.content_path, "Training_data") |
| training_data = torch.load(os.path.join(training_data_path, "training_dataset_data.pth"), |
| map_location="cpu") |
| training_data = training_data.to(self.DEVICE) |
|
|
| sample_ratio = sample_ratio |
| adversarial_samples = [] |
| epsilon = epsilon |
|
|
| for label in range(10): |
| indices = np.where(labels == label)[0] |
| sample_size = int(len(indices) * sample_ratio) |
| sampled_indices = np.random.choice(indices, size=sample_size, replace=False) |
| sampled_data = torch.Tensor(training_data[sampled_indices]) |
| print("sampeled data:{}".format(len(sampled_data))) |
| for i in range(10): |
| if i == label: |
| continue |
| target_label = i |
| |
| target = torch.tensor([target_label]) |
| adversarial_example = self.generate_adversarial_example(sampled_data, target, epsilon) |
| print("generating class {} 's adversary sampes for target{}, num of adv{}".format(label,i,len(adversarial_example))) |
| adversarial_samples.extend(adversarial_example) |
| |
| repr_model = self.feature_function(self.epoch) |
| adversarial_samples_torch = torch.stack(adversarial_samples) |
| print("adversarial_samples_torch", adversarial_samples_torch.shape) |
| data_representation = batch_run(repr_model,adversarial_samples_torch) |
|
|
| np.save(os.path.join(self.data_provider.content_path, "Model", "Epoch_{}".format(self.epoch), "adv_representation.npy"),data_representation ) |
|
|
| return adversarial_samples,data_representation |
| |
| def gen_specific_class_adv(self,epsilon=0.2,sample_ratio=0.1,from_label=1,target_label=2): |
| labels = self.data_provider.train_labels(self.epoch) |
| |
|
|
| training_data_path = os.path.join(self.data_provider.content_path, "Training_data") |
| training_data = torch.load(os.path.join(training_data_path, "training_dataset_data.pth"), |
| map_location="cpu") |
| training_data = training_data.to(self.DEVICE) |
|
|
| sample_ratio = sample_ratio |
| adversarial_samples = [] |
| epsilon = epsilon |
|
|
| |
| indices = np.where(labels == from_label)[0] |
| sample_size = int(len(indices) * sample_ratio) |
| sampled_indices = np.random.choice(indices, size=sample_size, replace=False) |
| sampled_data = torch.Tensor(training_data[sampled_indices]) |
| print("sampeled data:{}".format(len(sampled_data))) |
| |
| target_label = target_label |
| |
| target = torch.tensor([target_label]) |
| adversarial_example = self.generate_adversarial_example(sampled_data, target, epsilon) |
| print("generating class {} 's adversary sampes for target{}, num of adv{}".format(from_label,target_label,len(adversarial_example))) |
| adversarial_samples.extend(adversarial_example) |
| |
| repr_model = self.feature_function(self.epoch) |
| adversarial_samples_torch = torch.stack(adversarial_samples) |
| print("adversarial_samples_torch", adversarial_samples_torch.shape) |
| data_representation = batch_run(repr_model,adversarial_samples_torch) |
|
|
| return adversarial_samples,data_representation |
| |
| |
| def feature_function(self, epoch): |
| model_path = os.path.join(self.data_provider.content_path, "Model") |
| model_location = os.path.join(model_path, "{}_{:d}".format('Epoch', epoch), "subject_model.pth") |
| self.model.load_state_dict(torch.load(model_location, map_location=torch.device("cpu"))) |
| self.model = self.model.to(self.DEVICE) |
| self.model.eval() |
|
|
| fea_fn = self.model.feature |
| return fea_fn |
|
|
| def vae_loss(self,recon_x, x, mu, logvar): |
| BCE = F.binary_cross_entropy(recon_x, x.view(-1, 512), reduction='sum') |
| KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) |
| return BCE + KLD |
| |
| def generate_by_VAE(self): |
| train_data = self.data_provider.train_representation(self.epoch) |
| data_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True) |
| vae = VAE(512, 256, 2).to(self.data_provider.DEVICE) |
| optimizer = optim.Adam(vae.parameters()) |
| |
|
|
| vae.train() |
| num_epochs = 20 |
|
|
| for epoch in range(num_epochs): |
| for i, data in enumerate(data_loader): |
|
|
| data = data.to(self.data_provider.DEVICE) |
| optimizer.zero_grad() |
|
|
| recon_batch, mu, logvar = vae(data) |
|
|
| loss = self.vae_loss(recon_batch, data, mu, logvar) |
|
|
| loss.backward() |
| optimizer.step() |
|
|
| print(f'Epoch {epoch}, Loss: {loss.item()}') |
| |
|
|
| with torch.no_grad(): |
| mu, _ = vae.encode(torch.Tensor(train_data).to(self.data_provider.DEVICE)) |
| mu = mu.cpu().numpy() |
|
|
| ebd_min = np.min(mu, axis=0) |
| ebd_max = np.max(mu, axis=0) |
| ebd_extent = ebd_max - ebd_min |
| x_min, y_min = ebd_min - 0.02 * ebd_extent |
| x_max, y_max = ebd_max + 0.02 * ebd_extent |
| x_min = min(x_min, y_min) |
| y_min = min(x_min, y_min) |
| x_max = max(x_max, y_max) |
| y_max = max(x_max, y_max) |
|
|
| num_points =100 |
| x_values = np.linspace(x_min, x_max, num_points) |
| y_values = np.linspace(y_min, y_max, num_points) |
| x_grid, y_grid = np.meshgrid(x_values, y_values) |
| z_grid = np.column_stack([x_grid.flat, y_grid.flat]) |
|
|
|
|
|
|
| with torch.no_grad(): |
| z = torch.tensor(z_grid).to(self.data_provider.DEVICE).float() |
| samples = vae.decode(z) |
|
|
| |
| return samples |
|
|
| |
| def interpolate_samples(self, sample1, sample2, t): |
| return t * sample1 + (1 - t) * sample2 |
|
|
| def select_samples_from_different_classes(self, X, labels): |
| classes = np.unique(labels) |
| selected_samples = [] |
| for i in range(len(classes)-1): |
| for j in range(i+1, len(classes)): |
| samples_class_i = X[labels == classes[i]] |
| samples_class_j = X[labels == classes[j]] |
| sample1 = samples_class_i[np.random.choice(samples_class_i.shape[0])] |
| sample2 = samples_class_j[np.random.choice(samples_class_j.shape[0])] |
| selected_samples.append((sample1, sample2)) |
| return selected_samples |
| def get_conf(self, epoch, interpolated_X): |
| predctions = self.data_provider.get_pred(epoch, interpolated_X) |
| scores = np.amax(softmax(predctions, axis=1), axis=1) |
| return scores |
|
|
| def generate_interpolated_samples(self, X, labels, get_conf, num_interpolations_per_bin): |
| selected_samples = self.select_samples_from_different_classes(X, labels) |
|
|
| |
| confidence_bins = np.linspace(0.5, 1, 6)[1:-1] |
| |
| interpolated_X = {bin: [] for bin in confidence_bins} |
|
|
| |
| while min([len(samples) for samples in interpolated_X.values()]) < num_interpolations_per_bin: |
| batch_samples = [] |
| for _ in range(100): |
| |
| sample1, sample2 = selected_samples[np.random.choice(len(selected_samples))] |
| t = np.random.rand() |
| interpolated_sample = self.interpolate_samples(sample1, sample2, t) |
| batch_samples.append(interpolated_sample) |
|
|
| |
| confidences = get_conf(self.iteration, np.array(batch_samples)) |
| for i, confidence in enumerate(confidences): |
| for bin in confidence_bins: |
| if confidence < bin: |
| interpolated_X[bin].append(batch_samples[i]) |
| |
| break |
|
|
| return interpolated_X |
| |
| def inter_gen(self,num_pairs=2000): |
| train_data = self.data_provider.train_representation |
| labels = self.data_provider.train_labels |
| num_pairs = num_pairs |
| interpolated_X_div = self.generate_interpolated_samples(train_data,labels,self.get_conf,num_pairs) |
| confidence_bins = np.linspace(0.5, 1, 6)[1:-1] |
| interpolated_X = np.concatenate([np.array(interpolated_X_div[bin]) for bin in confidence_bins]) |
|
|
| np.save(os.path.join(self.data_provider.content_path, "Model", "Epoch_{}".format(self.iteration),"interpolated_X.npy"), interpolated_X) |
| return interpolated_X |
| |
|
|
| |
| def gen_more_boundary_mixed_up(self,l_bound=0.6,num_adv_eg=6000,name='border_centers_1.npy'): |
|
|
| training_data_path = os.path.join(self.data_provider.content_path, "Training_data") |
| training_data = torch.load(os.path.join(training_data_path, "training_dataset_data.pth"), |
| map_location="cpu") |
| training_data = training_data.to(self.DEVICE) |
|
|
| self.model = self.model.to(self.DEVICE) |
| confs = batch_run(self.model, training_data) |
| preds = np.argmax(confs, axis=1).squeeze() |
|
|
| repr_model = self.feature_function(self.epoch) |
| print("border_points generating...") |
| |
| border_points, _, _ = get_border_points(model=self.model, input_x=training_data, confs=confs, predictions=preds, device=self.DEVICE, l_bound=l_bound, num_adv_eg=num_adv_eg, lambd=0.05, verbose=0) |
|
|
| |
| border_points = border_points.to(self.DEVICE) |
| border_centers = batch_run(repr_model, border_points) |
| model_path = os.path.join(self.data_provider.content_path, "Model") |
| location = os.path.join(model_path, "Epoch_{:d}".format(self.epoch), name) |
| print("border_points saving...") |
| np.save(location, border_centers) |
|
|
| return border_centers |
| |
| def get_near_epoch_border(self,n_epoch): |
|
|
| model_path = os.path.join(self.data_provider.content_path, "Model") |
| location = os.path.join(model_path, "Epoch_{:d}".format(n_epoch), "ori_border_centers.npy") |
| border_points = np.load(location) |
| border_points = torch.Tensor(border_points) |
| border_points = border_points.to(self.DEVICE) |
| repr_model = self.feature_function(self.epoch) |
| border_centers = batch_run(repr_model, border_points) |
| |
| return border_centers |
|
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
|
|
|
|