| |
| import copy |
| import math |
| from functools import partial |
|
|
| import torch |
| import torch.nn as nn |
| import torch.utils.checkpoint as cp |
| from mmcv.cnn.bricks import ConvModule, DropPath |
| from mmengine.model import BaseModule, Sequential |
|
|
| from mmdet.registry import MODELS |
| from ..layers import InvertedResidual, SELayer |
| from ..utils import make_divisible |
|
|
|
|
| class EdgeResidual(BaseModule): |
| """Edge Residual Block. |
| |
| Args: |
| in_channels (int): The input channels of this module. |
| out_channels (int): The output channels of this module. |
| mid_channels (int): The input channels of the second convolution. |
| kernel_size (int): The kernel size of the first convolution. |
| Defaults to 3. |
| stride (int): The stride of the first convolution. Defaults to 1. |
| se_cfg (dict, optional): Config dict for se layer. Defaults to None, |
| which means no se layer. |
| with_residual (bool): Use residual connection. Defaults to True. |
| conv_cfg (dict, optional): Config dict for convolution layer. |
| Defaults to None, which means using conv2d. |
| norm_cfg (dict): Config dict for normalization layer. |
| Defaults to ``dict(type='BN')``. |
| act_cfg (dict): Config dict for activation layer. |
| Defaults to ``dict(type='ReLU')``. |
| drop_path_rate (float): stochastic depth rate. Defaults to 0. |
| with_cp (bool): Use checkpoint or not. Using checkpoint will save some |
| memory while slowing down the training speed. Defaults to False. |
| init_cfg (dict | list[dict], optional): Initialization config dict. |
| """ |
|
|
| def __init__(self, |
| in_channels, |
| out_channels, |
| mid_channels, |
| kernel_size=3, |
| stride=1, |
| se_cfg=None, |
| with_residual=True, |
| conv_cfg=None, |
| norm_cfg=dict(type='BN'), |
| act_cfg=dict(type='ReLU'), |
| drop_path_rate=0., |
| with_cp=False, |
| init_cfg=None, |
| **kwargs): |
| super(EdgeResidual, self).__init__(init_cfg=init_cfg) |
| assert stride in [1, 2] |
| self.with_cp = with_cp |
| self.drop_path = DropPath( |
| drop_path_rate) if drop_path_rate > 0 else nn.Identity() |
| self.with_se = se_cfg is not None |
| self.with_residual = ( |
| stride == 1 and in_channels == out_channels and with_residual) |
|
|
| if self.with_se: |
| assert isinstance(se_cfg, dict) |
|
|
| self.conv1 = ConvModule( |
| in_channels=in_channels, |
| out_channels=mid_channels, |
| kernel_size=kernel_size, |
| stride=1, |
| padding=kernel_size // 2, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
|
|
| if self.with_se: |
| self.se = SELayer(**se_cfg) |
|
|
| self.conv2 = ConvModule( |
| in_channels=mid_channels, |
| out_channels=out_channels, |
| kernel_size=1, |
| stride=stride, |
| padding=0, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=None) |
|
|
| def forward(self, x): |
|
|
| def _inner_forward(x): |
| out = x |
| out = self.conv1(out) |
|
|
| if self.with_se: |
| out = self.se(out) |
|
|
| out = self.conv2(out) |
|
|
| if self.with_residual: |
| return x + self.drop_path(out) |
| else: |
| return out |
|
|
| if self.with_cp and x.requires_grad: |
| out = cp.checkpoint(_inner_forward, x) |
| else: |
| out = _inner_forward(x) |
|
|
| return out |
|
|
|
|
| def model_scaling(layer_setting, arch_setting): |
| """Scaling operation to the layer's parameters according to the |
| arch_setting.""" |
| |
| new_layer_setting = copy.deepcopy(layer_setting) |
| for layer_cfg in new_layer_setting: |
| for block_cfg in layer_cfg: |
| block_cfg[1] = make_divisible(block_cfg[1] * arch_setting[0], 8) |
|
|
| |
| split_layer_setting = [new_layer_setting[0]] |
| for layer_cfg in new_layer_setting[1:-1]: |
| tmp_index = [0] |
| for i in range(len(layer_cfg) - 1): |
| if layer_cfg[i + 1][1] != layer_cfg[i][1]: |
| tmp_index.append(i + 1) |
| tmp_index.append(len(layer_cfg)) |
| for i in range(len(tmp_index) - 1): |
| split_layer_setting.append(layer_cfg[tmp_index[i]:tmp_index[i + |
| 1]]) |
| split_layer_setting.append(new_layer_setting[-1]) |
|
|
| num_of_layers = [len(layer_cfg) for layer_cfg in split_layer_setting[1:-1]] |
| new_layers = [ |
| int(math.ceil(arch_setting[1] * num)) for num in num_of_layers |
| ] |
|
|
| merge_layer_setting = [split_layer_setting[0]] |
| for i, layer_cfg in enumerate(split_layer_setting[1:-1]): |
| if new_layers[i] <= num_of_layers[i]: |
| tmp_layer_cfg = layer_cfg[:new_layers[i]] |
| else: |
| tmp_layer_cfg = copy.deepcopy(layer_cfg) + [layer_cfg[-1]] * ( |
| new_layers[i] - num_of_layers[i]) |
| if tmp_layer_cfg[0][3] == 1 and i != 0: |
| merge_layer_setting[-1] += tmp_layer_cfg.copy() |
| else: |
| merge_layer_setting.append(tmp_layer_cfg.copy()) |
| merge_layer_setting.append(split_layer_setting[-1]) |
|
|
| return merge_layer_setting |
|
|
|
|
| @MODELS.register_module() |
| class EfficientNet(BaseModule): |
| """EfficientNet backbone. |
| |
| Args: |
| arch (str): Architecture of efficientnet. Defaults to b0. |
| out_indices (Sequence[int]): Output from which stages. |
| Defaults to (6, ). |
| frozen_stages (int): Stages to be frozen (all param fixed). |
| Defaults to 0, which means not freezing any parameters. |
| conv_cfg (dict): Config dict for convolution layer. |
| Defaults to None, which means using conv2d. |
| norm_cfg (dict): Config dict for normalization layer. |
| Defaults to dict(type='BN'). |
| act_cfg (dict): Config dict for activation layer. |
| Defaults to dict(type='Swish'). |
| norm_eval (bool): Whether to set norm layers to eval mode, namely, |
| freeze running stats (mean and var). Note: Effect on Batch Norm |
| and its variants only. Defaults to False. |
| with_cp (bool): Use checkpoint or not. Using checkpoint will save some |
| memory while slowing down the training speed. Defaults to False. |
| """ |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| layer_settings = { |
| 'b': [[[3, 32, 0, 2, 0, -1]], |
| [[3, 16, 4, 1, 1, 0]], |
| [[3, 24, 4, 2, 6, 0], |
| [3, 24, 4, 1, 6, 0]], |
| [[5, 40, 4, 2, 6, 0], |
| [5, 40, 4, 1, 6, 0]], |
| [[3, 80, 4, 2, 6, 0], |
| [3, 80, 4, 1, 6, 0], |
| [3, 80, 4, 1, 6, 0], |
| [5, 112, 4, 1, 6, 0], |
| [5, 112, 4, 1, 6, 0], |
| [5, 112, 4, 1, 6, 0]], |
| [[5, 192, 4, 2, 6, 0], |
| [5, 192, 4, 1, 6, 0], |
| [5, 192, 4, 1, 6, 0], |
| [5, 192, 4, 1, 6, 0], |
| [3, 320, 4, 1, 6, 0]], |
| [[1, 1280, 0, 1, 0, -1]] |
| ], |
| 'e': [[[3, 32, 0, 2, 0, -1]], |
| [[3, 24, 0, 1, 3, 1]], |
| [[3, 32, 0, 2, 8, 1], |
| [3, 32, 0, 1, 8, 1]], |
| [[3, 48, 0, 2, 8, 1], |
| [3, 48, 0, 1, 8, 1], |
| [3, 48, 0, 1, 8, 1], |
| [3, 48, 0, 1, 8, 1]], |
| [[5, 96, 0, 2, 8, 0], |
| [5, 96, 0, 1, 8, 0], |
| [5, 96, 0, 1, 8, 0], |
| [5, 96, 0, 1, 8, 0], |
| [5, 96, 0, 1, 8, 0], |
| [5, 144, 0, 1, 8, 0], |
| [5, 144, 0, 1, 8, 0], |
| [5, 144, 0, 1, 8, 0], |
| [5, 144, 0, 1, 8, 0]], |
| [[5, 192, 0, 2, 8, 0], |
| [5, 192, 0, 1, 8, 0]], |
| [[1, 1280, 0, 1, 0, -1]] |
| ] |
| } |
|
|
| |
| |
| |
| arch_settings = { |
| 'b0': (1.0, 1.0, 224), |
| 'b1': (1.0, 1.1, 240), |
| 'b2': (1.1, 1.2, 260), |
| 'b3': (1.2, 1.4, 300), |
| 'b4': (1.4, 1.8, 380), |
| 'b5': (1.6, 2.2, 456), |
| 'b6': (1.8, 2.6, 528), |
| 'b7': (2.0, 3.1, 600), |
| 'b8': (2.2, 3.6, 672), |
| 'es': (1.0, 1.0, 224), |
| 'em': (1.0, 1.1, 240), |
| 'el': (1.2, 1.4, 300) |
| } |
|
|
| def __init__(self, |
| arch='b0', |
| drop_path_rate=0., |
| out_indices=(6, ), |
| frozen_stages=0, |
| conv_cfg=dict(type='Conv2dAdaptivePadding'), |
| norm_cfg=dict(type='BN', eps=1e-3), |
| act_cfg=dict(type='Swish'), |
| norm_eval=False, |
| with_cp=False, |
| init_cfg=[ |
| dict(type='Kaiming', layer='Conv2d'), |
| dict( |
| type='Constant', |
| layer=['_BatchNorm', 'GroupNorm'], |
| val=1) |
| ]): |
| super(EfficientNet, self).__init__(init_cfg) |
| assert arch in self.arch_settings, \ |
| f'"{arch}" is not one of the arch_settings ' \ |
| f'({", ".join(self.arch_settings.keys())})' |
| self.arch_setting = self.arch_settings[arch] |
| self.layer_setting = self.layer_settings[arch[:1]] |
| for index in out_indices: |
| if index not in range(0, len(self.layer_setting)): |
| raise ValueError('the item in out_indices must in ' |
| f'range(0, {len(self.layer_setting)}). ' |
| f'But received {index}') |
|
|
| if frozen_stages not in range(len(self.layer_setting) + 1): |
| raise ValueError('frozen_stages must be in range(0, ' |
| f'{len(self.layer_setting) + 1}). ' |
| f'But received {frozen_stages}') |
| self.drop_path_rate = drop_path_rate |
| self.out_indices = out_indices |
| self.frozen_stages = frozen_stages |
| self.conv_cfg = conv_cfg |
| self.norm_cfg = norm_cfg |
| self.act_cfg = act_cfg |
| self.norm_eval = norm_eval |
| self.with_cp = with_cp |
|
|
| self.layer_setting = model_scaling(self.layer_setting, |
| self.arch_setting) |
| block_cfg_0 = self.layer_setting[0][0] |
| block_cfg_last = self.layer_setting[-1][0] |
| self.in_channels = make_divisible(block_cfg_0[1], 8) |
| self.out_channels = block_cfg_last[1] |
| self.layers = nn.ModuleList() |
| self.layers.append( |
| ConvModule( |
| in_channels=3, |
| out_channels=self.in_channels, |
| kernel_size=block_cfg_0[0], |
| stride=block_cfg_0[3], |
| padding=block_cfg_0[0] // 2, |
| conv_cfg=self.conv_cfg, |
| norm_cfg=self.norm_cfg, |
| act_cfg=self.act_cfg)) |
| self.make_layer() |
| |
| if len(self.layers) < max(self.out_indices) + 1: |
| self.layers.append( |
| ConvModule( |
| in_channels=self.in_channels, |
| out_channels=self.out_channels, |
| kernel_size=block_cfg_last[0], |
| stride=block_cfg_last[3], |
| padding=block_cfg_last[0] // 2, |
| conv_cfg=self.conv_cfg, |
| norm_cfg=self.norm_cfg, |
| act_cfg=self.act_cfg)) |
|
|
| def make_layer(self): |
| |
| layer_setting = self.layer_setting[1:-1] |
|
|
| total_num_blocks = sum([len(x) for x in layer_setting]) |
| block_idx = 0 |
| dpr = [ |
| x.item() |
| for x in torch.linspace(0, self.drop_path_rate, total_num_blocks) |
| ] |
|
|
| for i, layer_cfg in enumerate(layer_setting): |
| |
| if i > max(self.out_indices) - 1: |
| break |
| layer = [] |
| for i, block_cfg in enumerate(layer_cfg): |
| (kernel_size, out_channels, se_ratio, stride, expand_ratio, |
| block_type) = block_cfg |
|
|
| mid_channels = int(self.in_channels * expand_ratio) |
| out_channels = make_divisible(out_channels, 8) |
| if se_ratio <= 0: |
| se_cfg = None |
| else: |
| |
| |
| se_cfg = dict( |
| channels=mid_channels, |
| ratio=expand_ratio * se_ratio, |
| act_cfg=(self.act_cfg, dict(type='Sigmoid'))) |
| if block_type == 1: |
| if i > 0 and expand_ratio == 3: |
| with_residual = False |
| expand_ratio = 4 |
| else: |
| with_residual = True |
| mid_channels = int(self.in_channels * expand_ratio) |
| if se_cfg is not None: |
| |
| |
| se_cfg = dict( |
| channels=mid_channels, |
| ratio=se_ratio * expand_ratio, |
| act_cfg=(self.act_cfg, dict(type='Sigmoid'))) |
| block = partial(EdgeResidual, with_residual=with_residual) |
| else: |
| block = InvertedResidual |
| layer.append( |
| block( |
| in_channels=self.in_channels, |
| out_channels=out_channels, |
| mid_channels=mid_channels, |
| kernel_size=kernel_size, |
| stride=stride, |
| se_cfg=se_cfg, |
| conv_cfg=self.conv_cfg, |
| norm_cfg=self.norm_cfg, |
| act_cfg=self.act_cfg, |
| drop_path_rate=dpr[block_idx], |
| with_cp=self.with_cp, |
| |
| |
| with_expand_conv=(mid_channels != self.in_channels))) |
| self.in_channels = out_channels |
| block_idx += 1 |
| self.layers.append(Sequential(*layer)) |
|
|
| def forward(self, x): |
| outs = [] |
| for i, layer in enumerate(self.layers): |
| x = layer(x) |
| if i in self.out_indices: |
| outs.append(x) |
|
|
| return tuple(outs) |
|
|
| def _freeze_stages(self): |
| for i in range(self.frozen_stages): |
| m = self.layers[i] |
| m.eval() |
| for param in m.parameters(): |
| param.requires_grad = False |
|
|
| def train(self, mode=True): |
| super(EfficientNet, self).train(mode) |
| self._freeze_stages() |
| if mode and self.norm_eval: |
| for m in self.modules(): |
| if isinstance(m, nn.BatchNorm2d): |
| m.eval() |
|
|