# Import all the required modules import os os.environ['KMP_DUPLICATE_LIB_OK']='True' import math from collections import OrderedDict import sys import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets import albumentations as A from albumentations.pytorch import ToTensorV2 from torch_lr_finder import LRFinder import lightning as L from pytorch_grad_cam import GradCAM from utils import * import torchmetrics class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_planes, planes, stride=1): super(BasicBlock, self).__init__() self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(planes) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(planes) self.shortcut = nn.Sequential() if stride != 1 or in_planes != self.expansion*planes: self.shortcut = nn.Sequential( nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(self.expansion*planes) ) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out += self.shortcut(x) out = F.relu(out) return out class ResNet18Model(L.LightningModule): def __init__(self, data_dir="./data", block=BasicBlock, num_blocks=[2, 2, 2, 2], num_classes=10): super(ResNet18Model, self).__init__() self.data_dir = data_dir self.num_classes = num_classes means = [0.4914, 0.4822, 0.4465] stds = [0.2470, 0.2435, 0.2616] self.train_transforms = A.Compose( [ A.Normalize(mean=means, std=stds, always_apply=True), A.PadIfNeeded(min_height=36, min_width=36, always_apply=True), A.RandomCrop(height=32, width=32, always_apply=True), A.HorizontalFlip(), A.CoarseDropout(max_holes=1, max_height=16, max_width=16, min_holes=1, min_height=8, min_width=8, fill_value=means), ToTensorV2(), ] ) self.test_transforms = A.Compose( [ A.Normalize(mean=means, std=stds, always_apply=True), ToTensorV2(), ] ) self.in_planes = 64 self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(64) self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) self.linear = nn.Linear(512*block.expansion, num_classes) self.accuracy = torchmetrics.classification.Accuracy(task="multiclass", num_classes=10) def _make_layer(self, block, planes, num_blocks, stride): strides = [stride] + [1]*(num_blocks-1) layers = [] for stride in strides: layers.append(block(self.in_planes, planes, stride)) self.in_planes = planes * block.expansion return nn.Sequential(*layers) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.layer1(out) out = self.layer2(out) out = self.layer3(out) out = self.layer4(out) out = F.avg_pool2d(out, 4) out = out.view(out.size(0), -1) out = self.linear(out) return out def training_step(self, batch, batch_idx): x, y = batch loss = F.cross_entropy(self(x), y) return loss def validation_step(self, batch, batch_idx): x, y = batch logits = self(x) loss = F.nll_loss(logits, y) preds = torch.argmax(logits, dim=1) self.accuracy(preds, y) # Calling self.log will surface up scalars for you in TensorBoard self.log("val_loss", loss, prog_bar=True) self.log("val_acc", self.accuracy, prog_bar=True) return loss def test_step(self, batch, batch_idx): # Here we just reuse the validation_step for testing return self.validation_step(batch, batch_idx) def configure_optimizers(self): LEARNING_RATE = 0.03 WEIGHT_DECAY = 1e-4 # # Loss Function # criterion = nn.CrossEntropyLoss() # optimizer = optim.SGD(self.parameters(), lr=LEARNING_RATE, momentum=0.9, weight_decay=WEIGHT_DECAY) # lr_finder2 = LRFinder(self, optimizer, criterion, device='cuda') # lr_finder2.range_test(train_loader, end_lr=10, num_iter=200, step_mode="exp") # lr_finder2.plot() # suggested_lr = lr_finder2.suggest_lr() # lr_finder2.reset() # EPOCHS = 20 # STEPS_PER_EPOCH = 2000 # scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, # max_lr=suggested_lr, # steps_per_epoch=STEPS_PER_EPOCH, # epochs=EPOCHS, # pct_start=int(0.3*EPOCHS)/EPOCHS if EPOCHS != 1 else 0.5, # 30% of total number of Epochs # div_factor=100, # three_phase=False, # final_div_factor=100, # anneal_strategy="linear" # ) return torch.optim.SGD(self.parameters(), lr=LEARNING_RATE, momentum=0.9, weight_decay=WEIGHT_DECAY) # return scheduler def prepare_data(self): # download Cifar10SearchDataset(self.data_dir, train=True, download=True) Cifar10SearchDataset(self.data_dir, train=False, download=True) def setup(self, stage=None): # Assign train/val datasets for use in dataloaders if stage == "fit" or stage is None: cifar_full = Cifar10SearchDataset(self.data_dir, train=True, transform=self.train_transforms) self.cifar_train, self.cifar_val = random_split(cifar_full, [45000, 5000]) # Assign test dataset for use in dataloader(s) if stage == "test" or stage is None: self.cifar_test = Cifar10SearchDataset(self.data_dir, train=False, transform=self.test_transforms) def train_dataloader(self): return DataLoader(self.cifar_train, batch_size=BATCH_SIZE, num_workers=os.cpu_count()) def val_dataloader(self): return DataLoader(self.cifar_val, batch_size=BATCH_SIZE, num_workers=os.cpu_count()) def test_dataloader(self): return DataLoader(self.cifar_test, batch_size=BATCH_SIZE, num_workers=os.cpu_count())