| from typing import List, Optional, Tuple, Union, Sequence, Dict |
| from dataclasses import dataclass |
| import inspect |
| from functools import partial |
| import warnings |
|
|
| import math |
| import torch |
| import torchvision |
| import torch.nn as nn |
| from torch import Tensor |
| import torch.nn.functional as F |
| from torch.nn.modules.batchnorm import _BatchNorm, SyncBatchNorm |
|
|
| from transformers.modeling_outputs import ModelOutput |
| from transformers.modeling_utils import PreTrainedModel |
| from transformers.utils import logging |
|
|
| from .configuration_rtmdet import RTMDetConfig |
|
|
|
|
| logger = logging.get_logger(__name__) |
|
|
| @dataclass |
| class DetectionOutput(ModelOutput): |
| """ |
| Output type for object detection models. |
| |
| Args: |
| boxes (`torch.FloatTensor` of shape `(batch_size, num_boxes, 4)`): |
| Detection boxes in format [x1, y1, x2, y2]. Coordinates are in |
| model-input space (640×640) by default, or in original image pixel |
| space when ``original_size`` was passed to ``forward()``. |
| scores (`torch.FloatTensor` of shape `(batch_size, num_boxes)`): |
| Detection confidence scores. |
| labels (`torch.LongTensor` of shape `(batch_size, num_boxes)`): |
| Detection class indices. |
| loss (`torch.FloatTensor`, *optional*): |
| Loss value if training. |
| """ |
| |
| boxes: torch.FloatTensor = None |
| scores: torch.FloatTensor = None |
| labels: torch.LongTensor = None |
| loss: Optional[torch.FloatTensor] = None |
|
|
|
|
| |
| ACTIVATION_LAYERS = { |
| 'ReLU': nn.ReLU, |
| 'LeakyReLU': nn.LeakyReLU, |
| 'PReLU': nn.PReLU, |
| 'SiLU': nn.SiLU, |
| 'Sigmoid': nn.Sigmoid, |
| 'Tanh': nn.Tanh, |
| 'GELU': nn.GELU, |
| 'Swish': nn.SiLU, |
| 'Hardsigmoid': nn.Hardsigmoid, |
| 'HSigmoid': nn.Hardsigmoid |
| } |
|
|
| |
| ConfigType = Dict |
| OptConfigType = Optional[Dict] |
| OptMultiConfig = Optional[Union[Dict, List[Dict]]] |
|
|
| def build_activation_layer(cfg: Dict) -> nn.Module: |
| """Build activation layer. |
| Args: |
| cfg (dict): The activation layer config, which should contain: |
| - type (str): Layer type. |
| - layer args: Args needed to instantiate an activation layer. |
| Returns: |
| nn.Module: Created activation layer. |
| """ |
| if not isinstance(cfg, dict): |
| raise TypeError('cfg must be a dict') |
| if 'type' not in cfg: |
| raise KeyError('the cfg dict must contain the key "type"') |
| |
| cfg_ = cfg.copy() |
| layer_type = cfg_.pop('type') |
| |
| if layer_type not in ACTIVATION_LAYERS: |
| raise KeyError(f'Unrecognized activation type {layer_type}') |
| |
| activation = ACTIVATION_LAYERS[layer_type] |
| return activation(**cfg_) |
|
|
| def kaiming_init(module, |
| a=0, |
| mode='fan_out', |
| nonlinearity='relu', |
| bias=0, |
| distribution='normal'): |
| assert distribution in ['uniform', 'normal'] |
| if hasattr(module, 'weight') and module.weight is not None: |
| if distribution == 'uniform': |
| nn.init.kaiming_uniform_( |
| module.weight, a=a, mode=mode, nonlinearity=nonlinearity) |
| else: |
| nn.init.kaiming_normal_( |
| module.weight, a=a, mode=mode, nonlinearity=nonlinearity) |
| if hasattr(module, 'bias') and module.bias is not None: |
| nn.init.constant_(module.bias, bias) |
|
|
| def constant_init(module, val, bias=0): |
| if hasattr(module, 'weight') and module.weight is not None: |
| nn.init.constant_(module.weight, val) |
| if hasattr(module, 'bias') and module.bias is not None: |
| nn.init.constant_(module.bias, bias) |
|
|
| class _InstanceNorm(nn.modules.instancenorm._InstanceNorm): |
| """Instance Normalization Base Class.""" |
| pass |
|
|
| |
| |
|
|
| def infer_abbr(class_type): |
| """Infer abbreviation from the class name.""" |
| if not inspect.isclass(class_type): |
| raise TypeError( |
| f'class_type must be a type, but got {type(class_type)}') |
| if hasattr(class_type, '_abbr_'): |
| return class_type._abbr_ |
| if issubclass(class_type, _InstanceNorm): |
| return 'in' |
| elif issubclass(class_type, _BatchNorm): |
| return 'bn' |
| elif issubclass(class_type, nn.GroupNorm): |
| return 'gn' |
| elif issubclass(class_type, nn.LayerNorm): |
| return 'ln' |
| else: |
| class_name = class_type.__name__.lower() |
| if 'batch' in class_name: |
| return 'bn' |
| elif 'group' in class_name: |
| return 'gn' |
| elif 'layer' in class_name: |
| return 'ln' |
| elif 'instance' in class_name: |
| return 'in' |
| else: |
| return 'norm_layer' |
|
|
| |
| NORM_LAYERS = { |
| 'BN': nn.BatchNorm2d, |
| 'BN1d': nn.BatchNorm1d, |
| 'BN2d': nn.BatchNorm2d, |
| 'BN3d': nn.BatchNorm3d, |
| 'SyncBN': SyncBatchNorm, |
| 'GN': nn.GroupNorm, |
| 'LN': nn.LayerNorm, |
| 'IN': nn.InstanceNorm2d, |
| 'IN1d': nn.InstanceNorm1d, |
| 'IN2d': nn.InstanceNorm2d, |
| 'IN3d': nn.InstanceNorm3d |
| } |
|
|
| CONV_LAYERS = { |
| 'Conv1d': nn.Conv1d, |
| 'Conv2d': nn.Conv2d, |
| 'Conv3d': nn.Conv3d, |
| 'Conv': nn.Conv2d |
| } |
|
|
| PADDING_LAYERS = { |
| 'zero': nn.ZeroPad2d, |
| 'reflect': nn.ReflectionPad2d, |
| 'replicate': nn.ReplicationPad2d |
| } |
|
|
| def build_norm_layer(cfg: Dict, |
| num_features: int, |
| postfix: Union[int, str] = '') -> Tuple[str, nn.Module]: |
| """Build normalization layer.""" |
| if not isinstance(cfg, dict): |
| raise TypeError('cfg must be a dict') |
| if 'type' not in cfg: |
| raise KeyError('the cfg dict must contain the key "type"') |
| |
| cfg_ = cfg.copy() |
| layer_type = cfg_.pop('type') |
| |
| if layer_type not in NORM_LAYERS: |
| raise KeyError(f'Unrecognized norm type {layer_type}') |
| |
| norm_layer = NORM_LAYERS[layer_type] |
| abbr = infer_abbr(norm_layer) |
| |
| assert isinstance(postfix, (int, str)) |
| name = abbr + str(postfix) |
| |
| requires_grad = cfg_.pop('requires_grad', True) |
| cfg_.setdefault('eps', 1e-5) |
| |
| if norm_layer is not nn.GroupNorm: |
| layer = norm_layer(num_features, **cfg_) |
| if layer_type == 'SyncBN' and hasattr(layer, '_specify_ddp_gpu_num'): |
| layer._specify_ddp_gpu_num(1) |
| else: |
| assert 'num_groups' in cfg_ |
| layer = norm_layer(num_channels=num_features, **cfg_) |
| |
| for param in layer.parameters(): |
| param.requires_grad = requires_grad |
| |
| return name, layer |
|
|
| def build_conv_layer(cfg: Optional[Dict], *args, **kwargs) -> nn.Module: |
| """Build convolution layer.""" |
| if cfg is None: |
| cfg_ = dict(type='Conv2d') |
| else: |
| if not isinstance(cfg, dict): |
| raise TypeError('cfg must be a dict') |
| if 'type' not in cfg: |
| raise KeyError('the cfg dict must contain the key "type"') |
| cfg_ = cfg.copy() |
| |
| layer_type = cfg_.pop('type') |
| |
| if layer_type not in CONV_LAYERS: |
| raise KeyError(f'Unrecognized conv type {layer_type}') |
| |
| conv_layer = CONV_LAYERS[layer_type] |
| layer = conv_layer(*args, **kwargs, **cfg_) |
| |
| return layer |
|
|
| def build_padding_layer(cfg: Dict, *args, **kwargs) -> nn.Module: |
| """Build padding layer.""" |
| if not isinstance(cfg, dict): |
| raise TypeError('cfg must be a dict') |
| if 'type' not in cfg: |
| raise KeyError('the cfg dict must contain the key "type"') |
| |
| cfg_ = cfg.copy() |
| padding_type = cfg_.pop('type') |
| |
| if padding_type not in PADDING_LAYERS: |
| raise KeyError(f'Unrecognized padding type {padding_type}') |
| |
| padding_layer = PADDING_LAYERS[padding_type] |
| layer = padding_layer(*args, **kwargs, **cfg_) |
| |
| return layer |
|
|
| def efficient_conv_bn_eval_forward(bn: _BatchNorm, |
| conv: nn.modules.conv._ConvNd, |
| x: torch.Tensor): |
| """ |
| Implementation based on https://arxiv.org/abs/2305.11624 |
| "Tune-Mode ConvBN Blocks For Efficient Transfer Learning" |
| It leverages the associative law between convolution and affine transform, |
| i.e., normalize (weight conv feature) = (normalize weight) conv feature. |
| It works for Eval mode of ConvBN blocks during validation, and can be used |
| for training as well. It reduces memory and computation cost. |
| Args: |
| bn (_BatchNorm): a BatchNorm module. |
| conv (nn._ConvNd): a conv module |
| x (torch.Tensor): Input feature map. |
| """ |
| |
| |
| weight_on_the_fly = conv.weight |
| if conv.bias is not None: |
| bias_on_the_fly = conv.bias |
| else: |
| bias_on_the_fly = torch.zeros_like(bn.running_var) |
| if bn.weight is not None: |
| bn_weight = bn.weight |
| else: |
| bn_weight = torch.ones_like(bn.running_var) |
| if bn.bias is not None: |
| bn_bias = bn.bias |
| else: |
| bn_bias = torch.zeros_like(bn.running_var) |
| |
| weight_coeff = torch.rsqrt(bn.running_var + |
| bn.eps).reshape([-1] + [1] * |
| (len(conv.weight.shape) - 1)) |
| |
| coefff_on_the_fly = bn_weight.view_as(weight_coeff) * weight_coeff |
| |
| weight_on_the_fly = weight_on_the_fly * coefff_on_the_fly |
| |
| bias_on_the_fly = bn_bias + coefff_on_the_fly.flatten() *\ |
| (bias_on_the_fly - bn.running_mean) |
| return conv._conv_forward(x, weight_on_the_fly, bias_on_the_fly) |
|
|
| class ConvModule(nn.Module): |
| """A conv block that bundles conv/norm/activation layers.""" |
| _abbr_ = 'conv_block' |
| |
| def __init__(self, |
| in_channels: int, |
| out_channels: int, |
| kernel_size: Union[int, Tuple[int, int]], |
| stride: Union[int, Tuple[int, int]] = 1, |
| padding: Union[int, Tuple[int, int]] = 0, |
| dilation: Union[int, Tuple[int, int]] = 1, |
| groups: int = 1, |
| bias: Union[bool, str] = 'auto', |
| conv_cfg: Optional[Dict] = None, |
| norm_cfg: Optional[Dict] = None, |
| act_cfg: Optional[Dict] = dict(type='ReLU'), |
| inplace: bool = True, |
| with_spectral_norm: bool = False, |
| padding_mode: str = 'zeros', |
| order: tuple = ('conv', 'norm', 'act'), |
| efficient_conv_bn_eval: bool = False): |
| super().__init__() |
| assert conv_cfg is None or isinstance(conv_cfg, dict) |
| assert norm_cfg is None or isinstance(norm_cfg, dict) |
| assert act_cfg is None or isinstance(act_cfg, dict) |
| official_padding_mode = ['zeros', 'circular'] |
| self.conv_cfg = conv_cfg |
| self.norm_cfg = norm_cfg |
| self.act_cfg = act_cfg |
| self.inplace = inplace |
| self.with_spectral_norm = with_spectral_norm |
| self.with_explicit_padding = padding_mode not in official_padding_mode |
| self.order = order |
| assert isinstance(self.order, tuple) and len(self.order) == 3 |
| assert set(order) == {'conv', 'norm', 'act'} |
| self.with_norm = norm_cfg is not None |
| self.with_activation = act_cfg is not None |
| |
| if bias == 'auto': |
| bias = not self.with_norm |
| self.with_bias = bias |
| |
| if self.with_explicit_padding: |
| pad_cfg = dict(type=padding_mode) |
| self.padding_layer = build_padding_layer(pad_cfg, padding) |
| |
| |
| conv_padding = 0 if self.with_explicit_padding else padding |
| |
| |
| self.conv = build_conv_layer( |
| conv_cfg, |
| in_channels, |
| out_channels, |
| kernel_size, |
| stride=stride, |
| padding=conv_padding, |
| dilation=dilation, |
| groups=groups, |
| bias=bias) |
| |
| |
| self.in_channels = self.conv.in_channels |
| self.out_channels = self.conv.out_channels |
| self.kernel_size = self.conv.kernel_size |
| self.stride = self.conv.stride |
| self.padding = padding |
| self.dilation = self.conv.dilation |
| self.transposed = self.conv.transposed |
| self.output_padding = self.conv.output_padding |
| self.groups = self.conv.groups |
| |
| if self.with_spectral_norm: |
| self.conv = nn.utils.spectral_norm(self.conv) |
| |
| |
| if self.with_norm: |
| |
| if order.index('norm') > order.index('conv'): |
| norm_channels = out_channels |
| else: |
| norm_channels = in_channels |
| self.norm_name, norm = build_norm_layer( |
| norm_cfg, norm_channels) |
| self.add_module(self.norm_name, norm) |
| if self.with_bias: |
| if isinstance(norm, (_BatchNorm, _InstanceNorm)): |
| warnings.warn( |
| 'Unnecessary conv bias before batch/instance norm') |
| else: |
| self.norm_name = None |
| |
| self.turn_on_efficient_conv_bn_eval(efficient_conv_bn_eval) |
| |
| |
| if self.with_activation: |
| act_cfg_ = act_cfg.copy() |
| |
| if act_cfg_['type'] not in [ |
| 'Tanh', 'PReLU', 'Sigmoid', 'HSigmoid', 'Swish', 'GELU' |
| ]: |
| act_cfg_.setdefault('inplace', inplace) |
| self.activate = build_activation_layer(act_cfg_) |
| |
| |
| self.init_weights() |
| |
| @property |
| def norm(self): |
| if self.norm_name: |
| return getattr(self, self.norm_name) |
| else: |
| return None |
| |
| def init_weights(self): |
| if not hasattr(self.conv, 'init_weights'): |
| if self.with_activation and self.act_cfg['type'] == 'LeakyReLU': |
| nonlinearity = 'leaky_relu' |
| a = self.act_cfg.get('negative_slope', 0.01) |
| else: |
| nonlinearity = 'relu' |
| a = 0 |
| kaiming_init(self.conv, a=a, nonlinearity=nonlinearity) |
| if self.with_norm: |
| constant_init(self.norm, 1, bias=0) |
| |
| def forward(self, |
| x: torch.Tensor, |
| activate: bool = True, |
| norm: bool = True) -> torch.Tensor: |
| layer_index = 0 |
| while layer_index < len(self.order): |
| layer = self.order[layer_index] |
| if layer == 'conv': |
| if self.with_explicit_padding: |
| x = self.padding_layer(x) |
| |
| |
| |
| |
| if layer_index + 1 < len(self.order) and \ |
| self.order[layer_index + 1] == 'norm' and norm and \ |
| self.with_norm and not self.norm.training and \ |
| self.efficient_conv_bn_eval_forward is not None: |
| self.conv.forward = partial( |
| self.efficient_conv_bn_eval_forward, self.norm, |
| self.conv) |
| layer_index += 1 |
| x = self.conv(x) |
| del self.conv.forward |
| else: |
| x = self.conv(x) |
| elif layer == 'norm' and norm and self.with_norm: |
| x = self.norm(x) |
| elif layer == 'act' and activate and self.with_activation: |
| x = self.activate(x) |
| layer_index += 1 |
| return x |
| |
| def turn_on_efficient_conv_bn_eval(self, efficient_conv_bn_eval=True): |
| |
| |
| if efficient_conv_bn_eval and self.norm \ |
| and isinstance(self.norm, _BatchNorm) \ |
| and self.norm.track_running_stats: |
| self.efficient_conv_bn_eval_forward = efficient_conv_bn_eval_forward |
| else: |
| self.efficient_conv_bn_eval_forward = None |
| |
| @staticmethod |
| def create_from_conv_bn(conv: torch.nn.modules.conv._ConvNd, |
| bn: torch.nn.modules.batchnorm._BatchNorm, |
| efficient_conv_bn_eval=True) -> 'ConvModule': |
| """Create a ConvModule from a conv and a bn module.""" |
| self = ConvModule.__new__(ConvModule) |
| super(ConvModule, self).__init__() |
| self.conv_cfg = None |
| self.norm_cfg = None |
| self.act_cfg = None |
| self.inplace = False |
| self.with_spectral_norm = False |
| self.with_explicit_padding = False |
| self.order = ('conv', 'norm', 'act') |
| self.with_norm = True |
| self.with_activation = False |
| self.with_bias = conv.bias is not None |
| |
| self.conv = conv |
| |
| self.in_channels = self.conv.in_channels |
| self.out_channels = self.conv.out_channels |
| self.kernel_size = self.conv.kernel_size |
| self.stride = self.conv.stride |
| self.padding = self.conv.padding |
| self.dilation = self.conv.dilation |
| self.transposed = self.conv.transposed |
| self.output_padding = self.conv.output_padding |
| self.groups = self.conv.groups |
| |
| self.norm_name, norm = 'bn', bn |
| self.add_module(self.norm_name, norm) |
| self.turn_on_efficient_conv_bn_eval(efficient_conv_bn_eval) |
| return self |
|
|
| class DepthwiseSeparableConvModule(nn.Module): |
| """Depthwise separable convolution module.""" |
| def __init__(self, |
| in_channels: int, |
| out_channels: int, |
| kernel_size: Union[int, Tuple[int, int]], |
| stride: Union[int, Tuple[int, int]] = 1, |
| padding: Union[int, Tuple[int, int]] = 0, |
| dilation: Union[int, Tuple[int, int]] = 1, |
| norm_cfg: Optional[Dict] = None, |
| act_cfg: Dict = dict(type='ReLU'), |
| dw_norm_cfg: Union[Dict, str] = 'default', |
| dw_act_cfg: Union[Dict, str] = 'default', |
| pw_norm_cfg: Union[Dict, str] = 'default', |
| pw_act_cfg: Union[Dict, str] = 'default', |
| **kwargs): |
| super().__init__() |
| assert 'groups' not in kwargs, 'groups should not be specified' |
| |
| |
| dw_norm_cfg = dw_norm_cfg if dw_norm_cfg != 'default' else norm_cfg |
| dw_act_cfg = dw_act_cfg if dw_act_cfg != 'default' else act_cfg |
| pw_norm_cfg = pw_norm_cfg if pw_norm_cfg != 'default' else norm_cfg |
| pw_act_cfg = pw_act_cfg if pw_act_cfg != 'default' else act_cfg |
| |
| |
| self.depthwise_conv = ConvModule( |
| in_channels, |
| in_channels, |
| kernel_size, |
| stride=stride, |
| padding=padding, |
| dilation=dilation, |
| groups=in_channels, |
| norm_cfg=dw_norm_cfg, |
| act_cfg=dw_act_cfg, |
| **kwargs) |
| |
| self.pointwise_conv = ConvModule( |
| in_channels, |
| out_channels, |
| 1, |
| norm_cfg=pw_norm_cfg, |
| act_cfg=pw_act_cfg, |
| **kwargs) |
| |
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| x = self.depthwise_conv(x) |
| x = self.pointwise_conv(x) |
| return x |
|
|
| class SPPBottleneck(nn.Module): |
| """Spatial pyramid pooling layer used in YOLOv3-SPP.""" |
| def __init__(self, |
| in_channels, |
| out_channels, |
| kernel_sizes=(5, 9, 13), |
| conv_cfg=None, |
| norm_cfg=dict(type='BN', momentum=0.03, eps=0.001), |
| act_cfg=dict(type='Swish'), |
| init_cfg=None): |
| super().__init__() |
| mid_channels = in_channels // 2 |
| self.conv1 = ConvModule( |
| in_channels, |
| mid_channels, |
| 1, |
| stride=1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| self.poolings = nn.ModuleList([ |
| nn.MaxPool2d(kernel_size=ks, stride=1, padding=ks // 2) |
| for ks in kernel_sizes |
| ]) |
| conv2_channels = mid_channels * (len(kernel_sizes) + 1) |
| self.conv2 = ConvModule( |
| conv2_channels, |
| out_channels, |
| 1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| |
| def forward(self, x): |
| x = self.conv1(x) |
| with torch.amp.autocast(enabled=False, device_type=x.device.type): |
| x = torch.cat( |
| [x] + [pooling(x) for pooling in self.poolings], dim=1) |
| x = self.conv2(x) |
| return x |
|
|
| class DarknetBottleneck(nn.Module): |
| """The basic bottleneck block used in Darknet.""" |
| def __init__(self, |
| in_channels: int, |
| out_channels: int, |
| expansion: float = 0.5, |
| add_identity: bool = True, |
| use_depthwise: bool = False, |
| conv_cfg: OptConfigType = None, |
| norm_cfg: ConfigType = dict( |
| type='BN', momentum=0.03, eps=0.001), |
| act_cfg: ConfigType = dict(type='Swish'), |
| init_cfg: OptMultiConfig = None) -> None: |
| super().__init__() |
| hidden_channels = int(out_channels * expansion) |
| conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule |
| self.conv1 = ConvModule( |
| in_channels, |
| hidden_channels, |
| 1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| self.conv2 = conv( |
| hidden_channels, |
| out_channels, |
| 3, |
| stride=1, |
| padding=1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| self.add_identity = \ |
| add_identity and in_channels == out_channels |
| |
| def forward(self, x: Tensor) -> Tensor: |
| """Forward function.""" |
| identity = x |
| out = self.conv1(x) |
| out = self.conv2(out) |
| if self.add_identity: |
| return out + identity |
| else: |
| return out |
|
|
| class CSPNeXtBlock(nn.Module): |
| """The basic bottleneck block used in CSPNeXt.""" |
| def __init__(self, |
| in_channels: int, |
| out_channels: int, |
| expansion: float = 0.5, |
| add_identity: bool = True, |
| use_depthwise: bool = False, |
| kernel_size: int = 5, |
| conv_cfg: OptConfigType = None, |
| norm_cfg: ConfigType = dict( |
| type='BN', momentum=0.03, eps=0.001), |
| act_cfg: ConfigType = dict(type='SiLU'), |
| init_cfg: OptMultiConfig = None) -> None: |
| super().__init__() |
| hidden_channels = int(out_channels * expansion) |
| conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule |
| self.conv1 = conv( |
| in_channels, |
| hidden_channels, |
| 3, |
| stride=1, |
| padding=1, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| self.conv2 = DepthwiseSeparableConvModule( |
| hidden_channels, |
| out_channels, |
| kernel_size, |
| stride=1, |
| padding=kernel_size // 2, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| self.add_identity = \ |
| add_identity and in_channels == out_channels |
| |
| def forward(self, x: Tensor) -> Tensor: |
| """Forward function.""" |
| identity = x |
| out = self.conv1(x) |
| out = self.conv2(out) |
| if self.add_identity: |
| return out + identity |
| else: |
| return out |
|
|
| class ChannelAttention(nn.Module): |
| """Channel attention Module.""" |
| def __init__(self, channels: int, init_cfg: OptMultiConfig = None) -> None: |
| super().__init__() |
| self.global_avgpool = nn.AdaptiveAvgPool2d(1) |
| self.fc = nn.Conv2d(channels, channels, 1, 1, 0, bias=True) |
| self.act = nn.Hardsigmoid(inplace=True) |
| |
| def forward(self, x: Tensor) -> Tensor: |
| """Forward function for ChannelAttention.""" |
| with torch.amp.autocast(enabled=False, device_type=x.device.type): |
| out = self.global_avgpool(x) |
| out = self.fc(out) |
| out = self.act(out) |
| return x * out |
|
|
| class CSPLayer(nn.Module): |
| """Cross Stage Partial Layer. |
| Args: |
| in_channels (int): The input channels of the CSP layer. |
| out_channels (int): The output channels of the CSP layer. |
| expand_ratio (float): Ratio to adjust the number of channels of the |
| hidden layer. Defaults to 0.5. |
| num_blocks (int): Number of blocks. Defaults to 1. |
| add_identity (bool): Whether to add identity in blocks. |
| Defaults to True. |
| use_cspnext_block (bool): Whether to use CSPNeXt block. |
| Defaults to False. |
| use_depthwise (bool): Whether to use depthwise separable convolution in |
| blocks. Defaults to False. |
| channel_attention (bool): Whether to add channel attention in each |
| stage. Defaults to True. |
| conv_cfg (dict, optional): Config dict for convolution layer. |
| Defaults to None, which means using conv2d. |
| norm_cfg (dict): Config dict for normalization layer. |
| Defaults to dict(type='BN') |
| act_cfg (dict): Config dict for activation layer. |
| Defaults to dict(type='Swish') |
| """ |
| def __init__(self, |
| in_channels: int, |
| out_channels: int, |
| expand_ratio: float = 0.5, |
| num_blocks: int = 1, |
| add_identity: bool = True, |
| use_depthwise: bool = False, |
| use_cspnext_block: bool = False, |
| channel_attention: bool = False, |
| conv_cfg: OptConfigType = None, |
| norm_cfg: ConfigType = dict( |
| type='BN', momentum=0.03, eps=0.001), |
| act_cfg: ConfigType = dict(type='Swish'), |
| init_cfg: OptMultiConfig = None) -> None: |
| super().__init__() |
| block = CSPNeXtBlock if use_cspnext_block else DarknetBottleneck |
| mid_channels = int(out_channels * expand_ratio) |
| self.channel_attention = channel_attention |
| |
| self.main_conv = ConvModule( |
| in_channels, |
| mid_channels, |
| 1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| |
| self.short_conv = ConvModule( |
| in_channels, |
| mid_channels, |
| 1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| |
| self.final_conv = ConvModule( |
| 2 * mid_channels, |
| out_channels, |
| 1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| |
| self.blocks = nn.Sequential(*[ |
| block( |
| mid_channels, |
| mid_channels, |
| 1.0, |
| add_identity, |
| use_depthwise, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) for _ in range(num_blocks) |
| ]) |
| |
| if channel_attention: |
| self.attention = ChannelAttention(2 * mid_channels) |
| |
| def forward(self, x: Tensor) -> Tensor: |
| """Forward function.""" |
| x_short = self.short_conv(x) |
| x_main = self.main_conv(x) |
| x_main = self.blocks(x_main) |
| x_final = torch.cat((x_main, x_short), dim=1) |
| |
| if self.channel_attention: |
| x_final = self.attention(x_final) |
| |
| return self.final_conv(x_final) |
| |
|
|
| class CSPNeXt(nn.Module): |
| """CSPNeXt backbone used in RTMDet. |
| This is a standalone implementation without requiring the mmdet registry. |
| |
| Args: |
| arch (str): Architecture of CSPNeXt, from {P5, P6}. |
| Defaults to P5. |
| expand_ratio (float): Ratio to adjust the number of channels of the |
| hidden layer. Defaults to 0.5. |
| deepen_factor (float): Depth multiplier, multiply number of |
| blocks in CSP layer by this amount. Defaults to 1.0. |
| widen_factor (float): Width multiplier, multiply number of |
| channels in each layer by this amount. Defaults to 1.0. |
| out_indices (Sequence[int]): Output from which stages. |
| Defaults to (2, 3, 4). |
| frozen_stages (int): Stages to be frozen (stop grad and set eval |
| mode). -1 means not freezing any parameters. Defaults to -1. |
| use_depthwise (bool): Whether to use depthwise separable convolution. |
| Defaults to False. |
| arch_ovewrite (list): Overwrite default arch settings. |
| Defaults to None. |
| spp_kernel_sizes: (tuple[int]): Sequential of kernel sizes of SPP |
| layers. Defaults to (5, 9, 13). |
| channel_attention (bool): Whether to add channel attention in each |
| stage. Defaults to True. |
| conv_cfg (:obj:`ConfigDict` or dict, optional): Config dict for |
| convolution layer. Defaults to None. |
| norm_cfg (:obj:`ConfigDict` or dict): Dictionary to construct and |
| config norm layer. Defaults to dict(type='BN', requires_grad=True). |
| act_cfg (:obj:`ConfigDict` or dict): Config dict for activation layer. |
| Defaults to dict(type='SiLU'). |
| norm_eval (bool): Whether to set norm layers to eval mode, namely, |
| freeze running stats (mean and var). Note: Effect on Batch Norm |
| and its variants only. |
| """ |
| |
| |
| |
| arch_settings = { |
| 'P5': [[64, 128, 3, True, False], [128, 256, 6, True, False], |
| [256, 512, 6, True, False], [512, 1024, 3, False, True]], |
| 'P6': [[64, 128, 3, True, False], [128, 256, 6, True, False], |
| [256, 512, 6, True, False], [512, 768, 3, True, False], |
| [768, 1024, 3, False, True]] |
| } |
| |
| def __init__( |
| self, |
| arch: str = 'P5', |
| deepen_factor: float = 1.0, |
| widen_factor: float = 1.0, |
| out_indices: Sequence[int] = (2, 3, 4), |
| frozen_stages: int = -1, |
| use_depthwise: bool = False, |
| expand_ratio: float = 0.5, |
| arch_ovewrite: dict = None, |
| spp_kernel_sizes: Sequence[int] = (5, 9, 13), |
| channel_attention: bool = True, |
| conv_cfg: OptConfigType = None, |
| norm_cfg: ConfigType = dict(type='BN', momentum=0.03, eps=0.001), |
| act_cfg: ConfigType = dict(type='SiLU'), |
| norm_eval: bool = False, |
| init_cfg: OptMultiConfig = dict( |
| type='Kaiming', |
| layer='Conv2d', |
| a=math.sqrt(5), |
| distribution='uniform', |
| mode='fan_in', |
| nonlinearity='leaky_relu') |
| ) -> None: |
| super().__init__() |
| arch_setting = self.arch_settings[arch] |
| if arch_ovewrite: |
| arch_setting = arch_ovewrite |
| assert set(out_indices).issubset( |
| i for i in range(len(arch_setting) + 1)) |
| if frozen_stages not in range(-1, len(arch_setting) + 1): |
| raise ValueError('frozen_stages must be in range(-1, ' |
| 'len(arch_setting) + 1). But received ' |
| f'{frozen_stages}') |
| |
| self.out_indices = out_indices |
| self.frozen_stages = frozen_stages |
| self.use_depthwise = use_depthwise |
| self.norm_eval = norm_eval |
| |
| conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule |
| |
| self.stem = nn.Sequential( |
| ConvModule( |
| 3, |
| int(arch_setting[0][0] * widen_factor // 2), |
| 3, |
| padding=1, |
| stride=2, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg), |
| ConvModule( |
| int(arch_setting[0][0] * widen_factor // 2), |
| int(arch_setting[0][0] * widen_factor // 2), |
| 3, |
| padding=1, |
| stride=1, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg), |
| ConvModule( |
| int(arch_setting[0][0] * widen_factor // 2), |
| int(arch_setting[0][0] * widen_factor), |
| 3, |
| padding=1, |
| stride=1, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg)) |
| |
| self.layers = ['stem'] |
| |
| for i, (in_channels, out_channels, num_blocks, add_identity, |
| use_spp) in enumerate(arch_setting): |
| in_channels = int(in_channels * widen_factor) |
| out_channels = int(out_channels * widen_factor) |
| num_blocks = max(round(num_blocks * deepen_factor), 1) |
| stage = [] |
| |
| conv_layer = conv( |
| in_channels, |
| out_channels, |
| 3, |
| stride=2, |
| padding=1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| stage.append(conv_layer) |
| |
| if use_spp: |
| spp = SPPBottleneck( |
| out_channels, |
| out_channels, |
| kernel_sizes=spp_kernel_sizes, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| stage.append(spp) |
| |
| csp_layer = CSPLayer( |
| out_channels, |
| out_channels, |
| num_blocks=num_blocks, |
| add_identity=add_identity, |
| use_depthwise=use_depthwise, |
| use_cspnext_block=True, |
| expand_ratio=expand_ratio, |
| channel_attention=channel_attention, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg) |
| stage.append(csp_layer) |
| |
| self.add_module(f'stage{i + 1}', nn.Sequential(*stage)) |
| self.layers.append(f'stage{i + 1}') |
| |
| def freeze_stages(self) -> None: |
| """Freeze stages parameters.""" |
| if self.frozen_stages >= 0: |
| for i in range(self.frozen_stages + 1): |
| m = getattr(self, self.layers[i]) |
| m.eval() |
| for param in m.parameters(): |
| param.requires_grad = False |
| |
| def train(self, mode=True) -> None: |
| """Convert the model into training mode while keeping normalization layer |
| frozen.""" |
| super().train(mode) |
| self.freeze_stages() |
| if mode and self.norm_eval: |
| for m in self.modules(): |
| if isinstance(m, _BatchNorm): |
| m.eval() |
| |
| def forward(self, x: Tuple[Tensor, ...]) -> Tuple[Tensor, ...]: |
| outs = [] |
| for i, layer_name in enumerate(self.layers): |
| layer = getattr(self, layer_name) |
| x = layer(x) |
| if i in self.out_indices: |
| outs.append(x) |
| return tuple(outs) |
| |
|
|
| class CSPNeXtPAFPN(nn.Module): |
| """Path Aggregation Network with CSPNeXt blocks. |
| This is a standalone implementation that works with the CSPNeXt backbone. |
| |
| Args: |
| in_channels (Sequence[int]): Number of input channels per scale. |
| out_channels (int): Number of output channels (used at each scale) |
| out_indices (Sequence[int]): Output from which stages. |
| num_csp_blocks (int): Number of bottlenecks in CSPLayer. |
| Defaults to 3. |
| use_depthwise (bool): Whether to use depthwise separable convolution in |
| blocks. Defaults to False. |
| expand_ratio (float): Ratio to adjust the number of channels of the |
| hidden layer. Default: 0.5 |
| upsample_cfg (dict): Config dict for interpolate layer. |
| Default: `dict(scale_factor=2, mode='nearest')` |
| conv_cfg (dict, optional): Config dict for convolution layer. |
| Default: None, which means using conv2d. |
| norm_cfg (dict): Config dict for normalization layer. |
| Default: dict(type='BN') |
| act_cfg (dict): Config dict for activation layer. |
| Default: dict(type='Swish') |
| """ |
|
|
| def __init__( |
| self, |
| in_channels: Sequence[int], |
| out_channels: int, |
| out_indices=(0, 1, 2), |
| num_csp_blocks: int = 3, |
| use_depthwise: bool = False, |
| expand_ratio: float = 0.5, |
| upsample_cfg: ConfigType = dict(scale_factor=2, mode='nearest'), |
| conv_cfg: OptConfigType = None, |
| norm_cfg: ConfigType = dict(type='BN', momentum=0.03, eps=0.001), |
| act_cfg: ConfigType = dict(type='Swish'), |
| init_cfg: OptMultiConfig = dict( |
| type='Kaiming', |
| layer='Conv2d', |
| a=math.sqrt(5), |
| distribution='uniform', |
| mode='fan_in', |
| nonlinearity='leaky_relu') |
| ) -> None: |
| super().__init__() |
| self.in_channels = in_channels |
| self.out_channels = out_channels |
| self.out_indices = out_indices |
|
|
| conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule |
|
|
| |
| self.upsample = nn.Upsample(**upsample_cfg) |
| self.reduce_layers = nn.ModuleList() |
| self.top_down_blocks = nn.ModuleList() |
| for idx in range(len(in_channels) - 1, 0, -1): |
| self.reduce_layers.append( |
| ConvModule( |
| in_channels[idx], |
| in_channels[idx - 1], |
| 1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg)) |
| self.top_down_blocks.append( |
| CSPLayer( |
| in_channels[idx - 1] * 2, |
| in_channels[idx - 1], |
| num_blocks=num_csp_blocks, |
| add_identity=False, |
| use_depthwise=use_depthwise, |
| use_cspnext_block=True, |
| expand_ratio=expand_ratio, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg)) |
|
|
| |
| self.downsamples = nn.ModuleList() |
| self.bottom_up_blocks = nn.ModuleList() |
| for idx in range(len(in_channels) - 1): |
| self.downsamples.append( |
| conv( |
| in_channels[idx], |
| in_channels[idx], |
| 3, |
| stride=2, |
| padding=1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg)) |
| self.bottom_up_blocks.append( |
| CSPLayer( |
| in_channels[idx] * 2, |
| in_channels[idx + 1], |
| num_blocks=num_csp_blocks, |
| add_identity=False, |
| use_depthwise=use_depthwise, |
| use_cspnext_block=True, |
| expand_ratio=expand_ratio, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg)) |
|
|
| if self.out_channels is not None: |
| self.out_convs = nn.ModuleList() |
| for i in range(len(in_channels)): |
| self.out_convs.append( |
| conv( |
| in_channels[i], |
| out_channels, |
| 3, |
| padding=1, |
| conv_cfg=conv_cfg, |
| norm_cfg=norm_cfg, |
| act_cfg=act_cfg)) |
|
|
| def forward(self, inputs: Tuple[Tensor, ...]) -> Tuple[Tensor, ...]: |
| """ |
| Args: |
| inputs (tuple[Tensor]): input features. |
| |
| Returns: |
| tuple[Tensor]: YOLOXPAFPN features. |
| """ |
| assert len(inputs) == len(self.in_channels) |
|
|
| |
| inner_outs = [inputs[-1]] |
| for idx in range(len(self.in_channels) - 1, 0, -1): |
| feat_high = inner_outs[0] |
| feat_low = inputs[idx - 1] |
| feat_high = self.reduce_layers[len(self.in_channels) - 1 - idx]( |
| feat_high) |
| inner_outs[0] = feat_high |
|
|
| upsample_feat = self.upsample(feat_high) |
|
|
| inner_out = self.top_down_blocks[len(self.in_channels) - 1 - idx]( |
| torch.cat([upsample_feat, feat_low], 1)) |
| inner_outs.insert(0, inner_out) |
|
|
| |
| outs = [inner_outs[0]] |
| for idx in range(len(self.in_channels) - 1): |
| feat_low = outs[-1] |
| feat_high = inner_outs[idx + 1] |
| downsample_feat = self.downsamples[idx](feat_low) |
| out = self.bottom_up_blocks[idx]( |
| torch.cat([downsample_feat, feat_high], 1)) |
| outs.append(out) |
|
|
| if self.out_channels is not None: |
| |
| for idx in range(len(outs)): |
| outs[idx] = self.out_convs[idx](outs[idx]) |
|
|
| return tuple([outs[i] for i in self.out_indices]) |
|
|
|
|
| class MlvlPointGenerator: |
| """Standard points generator for multi-level feature maps.""" |
|
|
| def __init__( |
| self, |
| strides, |
| offset: float = 0.5 |
| ) -> None: |
| if not isinstance(strides, (list, tuple)): |
| strides = [strides] |
| |
| self.strides = strides |
| self.offset = offset |
|
|
| def grid_priors( |
| self, |
| featmap_sizes, |
| dtype=torch.float32, |
| device='cuda', |
| with_stride=False |
| ): |
| """Generate grid points of multiple feature levels.""" |
| num_levels = len(featmap_sizes) |
| multi_level_priors = [] |
| |
| for i in range(num_levels): |
| priors = self.single_level_grid_priors( |
| featmap_sizes[i], |
| level_idx=i, |
| dtype=dtype, |
| device=device, |
| with_stride=with_stride) |
| multi_level_priors.append(priors) |
| |
| return multi_level_priors |
|
|
| def single_level_grid_priors( |
| self, |
| featmap_size, |
| level_idx, |
| dtype=torch.float32, |
| device='cuda', |
| with_stride=False |
| ): |
| """Generate grid points for a single feature level.""" |
| feat_h, feat_w = featmap_size |
| stride = self.strides[level_idx] |
| |
| |
| shift_x = (torch.arange(0, feat_w, device=device) + self.offset) * stride |
| shift_y = (torch.arange(0, feat_h, device=device) + self.offset) * stride |
| |
| shift_x = shift_x.to(dtype) |
| shift_y = shift_y.to(dtype) |
| |
| |
| shift_yy, shift_xx = torch.meshgrid(shift_y, shift_x, indexing="ij") |
| shift_xx = shift_xx.reshape(-1) |
| shift_yy = shift_yy.reshape(-1) |
| |
| if not with_stride: |
| shifts = torch.stack([shift_xx, shift_yy], dim=-1) |
| else: |
| |
| stride_tensor = torch.tensor(stride, dtype=dtype, device=device) |
| stride_xx = torch.full_like(shift_xx, stride_tensor) |
| stride_yy = torch.full_like(shift_yy, stride_tensor) |
| shifts = torch.stack([shift_xx, shift_yy, stride_xx, stride_yy], dim=-1) |
| |
| return shifts |
|
|
|
|
| |
| def sigmoid_geometric_mean(x, y): |
| """Compute geometric mean of two sigmoid functions.""" |
| x_sigmoid = torch.sigmoid(x) |
| y_sigmoid = torch.sigmoid(y) |
| return torch.sqrt(x_sigmoid * y_sigmoid) |
|
|
|
|
| def inverse_sigmoid(x, eps=1e-5): |
| """Inverse function of sigmoid.""" |
| x = x.clamp(min=0, max=1) |
| x1 = x.clamp(min=eps) |
| x2 = (1 - x).clamp(min=eps) |
| return torch.log(x1 / x2) |
|
|
|
|
| class RTMDetSepBNHead(nn.Module): |
| """RTMDetHead with separated BN layers and shared conv layers.""" |
|
|
| def __init__( |
| self, |
| num_classes: int, |
| in_channels: int, |
| share_conv: bool = True, |
| use_depthwise: bool = False, |
| pred_kernel_size: int = 1, |
| stacked_convs: int = 2, |
| feat_channels: int = 256, |
| strides: List[int] = [8, 16, 32], |
| with_objectness: bool = False, |
| exp_on_reg: bool = False, |
| ) -> None: |
| super().__init__() |
| self.num_classes = num_classes |
| self.cls_out_channels = num_classes |
| self.in_channels = in_channels |
| self.feat_channels = feat_channels |
| self.stacked_convs = stacked_convs |
| self.share_conv = share_conv |
| self.use_depthwise = use_depthwise |
| self.pred_kernel_size = pred_kernel_size |
| self.with_objectness = with_objectness |
| self.exp_on_reg = exp_on_reg |
| self.strides = strides |
| |
| |
| self.num_base_priors = 1 |
| |
| self._init_layers() |
|
|
| def _init_layers(self) -> None: |
| """Initialize layers of the head.""" |
| self.cls_convs = nn.ModuleList() |
| self.reg_convs = nn.ModuleList() |
|
|
| self.rtm_cls = nn.ModuleList() |
| self.rtm_reg = nn.ModuleList() |
| if self.with_objectness: |
| self.rtm_obj = nn.ModuleList() |
| |
| for n in range(len(self.strides)): |
| cls_convs = nn.ModuleList() |
| reg_convs = nn.ModuleList() |
| for i in range(self.stacked_convs): |
| chn = self.in_channels if i == 0 else self.feat_channels |
| |
| if self.use_depthwise: |
| cls_conv = DepthwiseSeparableConvModule( |
| chn, |
| self.feat_channels, |
| 3, |
| stride=1, |
| padding=1, |
| bias=False, |
| act_cfg=dict(type='SiLU'), |
| norm_cfg=dict(type='BN', momentum=0.03, eps=0.001) |
| ) |
| reg_conv = DepthwiseSeparableConvModule( |
| chn, |
| self.feat_channels, |
| 3, |
| stride=1, |
| padding=1, |
| bias=False, |
| act_cfg=dict(type='SiLU'), |
| norm_cfg=dict(type='BN', momentum=0.03, eps=0.001) |
| ) |
| else: |
| cls_conv = ConvModule( |
| chn, |
| self.feat_channels, |
| 3, |
| stride=1, |
| padding=1, |
| bias=False, |
| act_cfg=dict(type='SiLU'), |
| norm_cfg=dict(type='BN', momentum=0.03, eps=0.001)) |
| reg_conv = ConvModule( |
| chn, |
| self.feat_channels, |
| 3, |
| stride=1, |
| padding=1, |
| bias=False, |
| act_cfg=dict(type='SiLU'), |
| norm_cfg=dict(type='BN', momentum=0.03, eps=0.001)) |
| |
| cls_convs.append(cls_conv) |
| reg_convs.append(reg_conv) |
| |
| self.cls_convs.append(cls_convs) |
| self.reg_convs.append(reg_convs) |
|
|
| self.rtm_cls.append( |
| nn.Conv2d( |
| self.feat_channels, |
| self.num_base_priors * self.cls_out_channels, |
| self.pred_kernel_size, |
| padding=self.pred_kernel_size // 2)) |
| self.rtm_reg.append( |
| nn.Conv2d( |
| self.feat_channels, |
| self.num_base_priors * 4, |
| self.pred_kernel_size, |
| padding=self.pred_kernel_size // 2)) |
| if self.with_objectness: |
| self.rtm_obj.append( |
| nn.Conv2d( |
| self.feat_channels, |
| 1, |
| self.pred_kernel_size, |
| padding=self.pred_kernel_size // 2)) |
|
|
| if self.share_conv: |
| for n in range(1, len(self.strides)): |
| for i in range(self.stacked_convs): |
| self.cls_convs[n][i] = self.cls_convs[0][i] |
| self.reg_convs[n][i] = self.reg_convs[0][i] |
| |
| |
| self.prior_generator = MlvlPointGenerator(self.strides, offset=0.0) |
| |
| def init_weights(self): |
| """Initialize weights of the head.""" |
| |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| nn.init.normal_(m.weight, mean=0, std=0.01) |
| if m.bias is not None: |
| nn.init.constant_(m.bias, 0) |
| if isinstance(m, nn.BatchNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| |
| |
| bias_init = -torch.log(torch.tensor((1 - 0.01) / 0.01)) |
| for rtm_cls in self.rtm_cls: |
| nn.init.normal_(rtm_cls.weight, mean=0, std=0.01) |
| nn.init.constant_(rtm_cls.bias, bias_init) |
| |
| for rtm_reg in self.rtm_reg: |
| nn.init.normal_(rtm_reg.weight, mean=0, std=0.01) |
| nn.init.constant_(rtm_reg.bias, 0) |
| |
| if self.with_objectness: |
| for rtm_obj in self.rtm_obj: |
| nn.init.normal_(rtm_obj.weight, mean=0, std=0.01) |
| nn.init.constant_(rtm_obj.bias, bias_init) |
|
|
| def forward(self, feats): |
| """Forward features from the upstream network. |
| |
| Args: |
| feats (tuple[Tensor]): Features from the upstream network, each is |
| a 4D-tensor. |
| |
| Returns: |
| tuple: Usually a tuple of classification scores and bbox prediction |
| - cls_scores (list[Tensor]): Classification scores for all scale |
| levels, each is a 4D-tensor. |
| - bbox_preds (list[Tensor]): Box energies / deltas for all scale |
| levels, each is a 4D-tensor. |
| """ |
| cls_scores = [] |
| bbox_preds = [] |
| for idx, (x, stride) in enumerate( |
| zip(feats, self.strides)): |
| cls_feat = x |
| reg_feat = x |
|
|
| for cls_layer in self.cls_convs[idx]: |
| cls_feat = cls_layer(cls_feat) |
| cls_score = self.rtm_cls[idx](cls_feat) |
|
|
| for reg_layer in self.reg_convs[idx]: |
| reg_feat = reg_layer(reg_feat) |
|
|
| if self.with_objectness: |
| objectness = self.rtm_obj[idx](reg_feat) |
| cls_score = inverse_sigmoid( |
| sigmoid_geometric_mean(cls_score, objectness)) |
| |
| if self.exp_on_reg: |
| |
| reg_dist = self.rtm_reg[idx](reg_feat).exp() * stride |
| else: |
| reg_dist = self.rtm_reg[idx](reg_feat) * stride |
| |
| cls_scores.append(cls_score) |
| bbox_preds.append(reg_dist) |
| |
| return tuple(cls_scores), tuple(bbox_preds) |
| |
| def predict(self, cls_scores, bbox_preds, batch_img_metas=None, cfg=None, |
| rescale=False, with_nms=True, score_thr=0.05, |
| nms_iou_threshold=0.6, max_per_img=100): |
| """Transform network outputs into bbox predictions. |
| |
| This is a simplified version for inference only. |
| """ |
| assert len(cls_scores) == len(bbox_preds) |
| num_levels = len(cls_scores) |
| device = cls_scores[0].device |
| batch_size = cls_scores[0].shape[0] |
| |
| |
| if batch_img_metas is None: |
| |
| featmap_sizes = [cls_scores[i].shape[-2:] for i in range(num_levels)] |
| strides = self.strides |
| |
| |
| |
| upscaled_sizes = [] |
| for i, featmap_size in enumerate(featmap_sizes): |
| h, w = featmap_size |
| upscaled_sizes.append((h * strides[i], w * strides[i])) |
| |
| |
| img_h = max(s[0] for s in upscaled_sizes) |
| img_w = max(s[1] for s in upscaled_sizes) |
| |
| batch_img_metas = [{ |
| 'img_shape': (img_h, img_w, 3), |
| 'scale_factor': [1.0, 1.0, 1.0, 1.0] |
| } for _ in range(batch_size)] |
| |
| |
| featmap_sizes = [cls_scores[i].shape[-2:] for i in range(num_levels)] |
| |
| |
| mlvl_priors = self.prior_generator.grid_priors( |
| featmap_sizes, |
| dtype=cls_scores[0].dtype, |
| device=device, |
| with_stride=True) |
|
|
| result_list = [] |
| for img_id in range(batch_size): |
| img_meta = batch_img_metas[img_id] |
| cls_score_list = [ |
| cls_scores[i][img_id].detach() for i in range(num_levels) |
| ] |
| bbox_pred_list = [ |
| bbox_preds[i][img_id].detach() for i in range(num_levels) |
| ] |
| |
| results = self._predict_by_feat_single( |
| cls_score_list, |
| bbox_pred_list, |
| mlvl_priors, |
| img_meta, |
| score_thr=score_thr, |
| nms_iou_threshold=nms_iou_threshold, |
| max_per_img=max_per_img, |
| rescale=rescale, |
| with_nms=with_nms |
| ) |
| result_list.append(results) |
| |
| |
| boxes_batch = [] |
| scores_batch = [] |
| labels_batch = [] |
| |
| for result in result_list: |
| boxes = result['bboxes'] |
| scores = result.get('scores', boxes[:, -1]) |
| labels = result['labels'] |
| |
| |
| if boxes.shape[1] > 4: |
| boxes = boxes[:, :4] |
| |
| boxes_batch.append(boxes) |
| scores_batch.append(scores) |
| labels_batch.append(labels) |
| |
| |
| if all(len(boxes) > 0 for boxes in boxes_batch): |
| return DetectionOutput( |
| boxes=torch.stack(boxes_batch), |
| scores=torch.stack(scores_batch), |
| labels=torch.stack(labels_batch) |
| ) |
| |
| |
| max_num = max(len(boxes) for boxes in boxes_batch) |
| if max_num == 0: |
| |
| dummy = torch.zeros((batch_size, 0, 4), device=device) |
| return DetectionOutput( |
| boxes=dummy, |
| scores=torch.zeros((batch_size, 0), device=device), |
| labels=torch.zeros((batch_size, 0), dtype=torch.long, device=device) |
| ) |
| |
| |
| padded_boxes = [] |
| padded_scores = [] |
| padded_labels = [] |
| |
| for boxes, scores, labels in zip(boxes_batch, scores_batch, labels_batch): |
| num_dets = len(boxes) |
| if num_dets == 0: |
| padded_boxes.append(torch.zeros((max_num, 4), device=device)) |
| padded_scores.append(torch.zeros(max_num, device=device)) |
| padded_labels.append(torch.zeros(max_num, dtype=torch.long, device=device)) |
| else: |
| padding = torch.zeros((max_num - num_dets, 4), device=device) |
| padded_boxes.append(torch.cat([boxes, padding], dim=0)) |
| |
| padding = torch.zeros(max_num - num_dets, device=device) |
| padded_scores.append(torch.cat([scores, padding], dim=0)) |
| |
| padding = torch.zeros(max_num - num_dets, dtype=torch.long, device=device) |
| padded_labels.append(torch.cat([labels, padding], dim=0)) |
| |
| return DetectionOutput( |
| boxes=torch.stack(padded_boxes), |
| scores=torch.stack(padded_scores), |
| labels=torch.stack(padded_labels) |
| ) |
|
|
| def _predict_by_feat_single(self, cls_score_list, bbox_pred_list, mlvl_priors, |
| img_meta, score_thr=0.05, nms_iou_threshold=0.6, |
| max_per_img=100, rescale=False, with_nms=True): |
| """Transform outputs of a single image into bbox predictions. |
| |
| This is a simplified version for inference only. |
| """ |
| |
| mlvl_bboxes = [] |
| mlvl_scores = [] |
| |
| for level_idx, (cls_score, bbox_pred, priors) in enumerate( |
| zip(cls_score_list, bbox_pred_list, mlvl_priors)): |
| assert cls_score.size()[-2:] == bbox_pred.size()[-2:] |
| |
| |
| cls_score = cls_score.permute(1, 2, 0).reshape(-1, self.cls_out_channels) |
| bbox_pred = bbox_pred.permute(1, 2, 0).reshape(-1, 4) |
| |
| |
| scores = torch.sigmoid(cls_score) |
| |
| |
| max_scores, _ = scores.max(dim=1) |
| keep_mask = max_scores > score_thr |
| scores = scores[keep_mask] |
| bbox_pred = bbox_pred[keep_mask] |
| priors = priors[keep_mask] |
| |
| |
| if scores.numel() == 0: |
| continue |
| |
| |
| bboxes = self._decode_bboxes(priors, bbox_pred, img_meta.get('img_shape')) |
| |
| mlvl_bboxes.append(bboxes) |
| mlvl_scores.append(scores) |
| |
| |
| if len(mlvl_bboxes) == 0: |
| |
| return { |
| 'bboxes': torch.zeros((0, 4), device=cls_score_list[0].device), |
| 'scores': torch.zeros((0,), device=cls_score_list[0].device), |
| 'labels': torch.zeros((0,), device=cls_score_list[0].device, dtype=torch.long) |
| } |
| |
| bboxes = torch.cat(mlvl_bboxes) |
| scores = torch.cat(mlvl_scores) |
| |
| |
| if rescale and 'scale_factor' in img_meta: |
| bboxes /= bboxes.new_tensor(img_meta['scale_factor']).repeat((1, 2)) |
| |
| |
| if with_nms: |
| det_bboxes, det_labels = self._nms(bboxes, scores, |
| nms_iou_threshold, |
| max_per_img) |
| else: |
| |
| scores_flattened = scores.flatten() |
| if scores_flattened.size(0) > max_per_img: |
| top_scores, indices = scores_flattened.topk(max_per_img) |
| scores_top_k = scores.view(-1, self.num_classes).index_select(0, indices) |
| bboxes_top_k = bboxes.index_select(0, indices) |
| labels_top_k = indices % self.num_classes |
| det_bboxes = torch.cat([bboxes_top_k, top_scores.unsqueeze(-1)], dim=1) |
| det_labels = labels_top_k |
| else: |
| |
| num_bboxes = bboxes.size(0) |
| max_scores, labels = scores.max(dim=1) |
| det_bboxes = torch.cat([bboxes, max_scores.unsqueeze(-1)], dim=1) |
| det_labels = labels |
| |
| return { |
| 'bboxes': det_bboxes, |
| 'scores': det_bboxes[:, -1], |
| 'labels': det_labels |
| } |
| |
| def _decode_bboxes(self, priors, distance, max_shape=None): |
| """Decode distance predictions to bounding box coordinates.""" |
| |
| xy = priors[..., :2] |
| |
| |
| |
| |
| |
| x1 = xy[..., 0] - distance[..., 0] |
| y1 = xy[..., 1] - distance[..., 1] |
| x2 = xy[..., 0] + distance[..., 2] |
| y2 = xy[..., 1] + distance[..., 3] |
| |
| bboxes = torch.stack([x1, y1, x2, y2], -1) |
| |
| |
| if max_shape is not None: |
| bboxes[..., 0].clamp_(min=0, max=max_shape[1]) |
| bboxes[..., 1].clamp_(min=0, max=max_shape[0]) |
| bboxes[..., 2].clamp_(min=0, max=max_shape[1]) |
| bboxes[..., 3].clamp_(min=0, max=max_shape[0]) |
| |
| return bboxes |
| |
| def _nms(self, bboxes, scores, iou_threshold, max_per_img): |
| """Apply NMS to detection results.""" |
| |
| num_classes = scores.shape[1] |
| det_bboxes = [] |
| det_labels = [] |
| |
| for cls_idx in range(num_classes): |
| cls_scores = scores[:, cls_idx] |
| keep_idx = cls_scores > 0.05 |
| |
| if not keep_idx.any(): |
| continue |
| |
| cls_bboxes = bboxes[keep_idx] |
| cls_scores = cls_scores[keep_idx] |
| |
| |
| keep = self._batched_nms(cls_bboxes, cls_scores, iou_threshold) |
| keep = keep[:max_per_img] |
| |
| det_bboxes.append(torch.cat([cls_bboxes[keep], cls_scores[keep].unsqueeze(-1)], dim=1)) |
| det_labels.append(cls_bboxes.new_full((keep.size(0),), cls_idx, dtype=torch.long)) |
| |
| if len(det_bboxes) > 0: |
| det_bboxes = torch.cat(det_bboxes, dim=0) |
| det_labels = torch.cat(det_labels, dim=0) |
| |
| |
| _, indices = det_bboxes[:, -1].sort(descending=True) |
| det_bboxes = det_bboxes[indices] |
| det_labels = det_labels[indices] |
| |
| |
| det_bboxes = det_bboxes[:max_per_img] |
| det_labels = det_labels[:max_per_img] |
| else: |
| |
| det_bboxes = bboxes.new_zeros((0, 5)) |
| det_labels = bboxes.new_zeros((0,), dtype=torch.long) |
| |
| return det_bboxes, det_labels |
| |
| def _batched_nms(self, boxes, scores, iou_threshold): |
| """Performs non-maximum suppression on a batch of boxes.""" |
| if boxes.shape[0] == 0: |
| return boxes.new_zeros(0, dtype=torch.long) |
| |
| try: |
| |
| return torchvision.ops.nms(boxes, scores, iou_threshold) |
| except: |
| |
| x1 = boxes[:, 0] |
| y1 = boxes[:, 1] |
| x2 = boxes[:, 2] |
| y2 = boxes[:, 3] |
| areas = (x2 - x1) * (y2 - y1) |
| _, order = scores.sort(descending=True) |
| |
| keep = [] |
| while order.size(0) > 0: |
| i = order[0].item() |
| keep.append(i) |
| |
| if order.size(0) == 1: |
| break |
| |
| xx1 = torch.max(x1[order[1:]], x1[i]) |
| yy1 = torch.max(y1[order[1:]], y1[i]) |
| xx2 = torch.min(x2[order[1:]], x2[i]) |
| yy2 = torch.min(y2[order[1:]], y2[i]) |
| |
| w = torch.clamp(xx2 - xx1, min=0) |
| h = torch.clamp(yy2 - yy1, min=0) |
| inter = w * h |
| |
| iou = inter / (areas[i] + areas[order[1:]] - inter) |
| |
| inds = torch.where(iou <= iou_threshold)[0] |
| order = order[inds + 1] |
| |
| return torch.tensor(keep, dtype=torch.long, device=boxes.device) |
|
|
|
|
| class RTMDetModel(PreTrainedModel): |
| """ |
| RTMDet object detection model compatible with Hugging Face transformers. |
| Updated implementation using PyTorch only with no NumPy or OpenCV dependencies. |
| |
| This model consists of a backbone, neck, and detection head for object detection. |
| """ |
| |
| config_class = RTMDetConfig |
| base_model_prefix = "rtmdet" |
| main_input_name = "pixel_values" |
| |
| |
| _tied_weights_keys = None |
|
|
| def mark_tied_weights_as_initialized(self): |
| pass |
|
|
| def __init__(self, config): |
| super().__init__(config) |
| |
| |
| self.backbone = CSPNeXt( |
| arch=config.backbone_arch, |
| deepen_factor=config.backbone_deepen_factor, |
| widen_factor=config.backbone_widen_factor, |
| expand_ratio=config.backbone_expand_ratio, |
| channel_attention=config.backbone_channel_attention, |
| use_depthwise=False, |
| ) |
| |
| |
| self.neck = CSPNeXtPAFPN( |
| in_channels=config.neck_in_channels, |
| out_channels=config.neck_out_channels, |
| num_csp_blocks=config.neck_num_csp_blocks, |
| expand_ratio=config.neck_expand_ratio, |
| use_depthwise=False, |
| ) |
| |
| |
| self.bbox_head = RTMDetSepBNHead( |
| num_classes=config.num_classes, |
| in_channels=config.head_in_channels, |
| stacked_convs=config.head_stacked_convs, |
| feat_channels=config.head_feat_channels, |
| with_objectness=config.head_with_objectness, |
| exp_on_reg=config.head_exp_on_reg, |
| share_conv=config.head_share_conv, |
| pred_kernel_size=config.head_pred_kernel_size, |
| strides=config.strides, |
| use_depthwise=False |
| ) |
| |
| |
| self.init_weights() |
| |
| self.post_init() |
| |
| def init_weights(self): |
| """Initialize the weights of the model.""" |
| |
| |
| |
| |
| self.bbox_head.init_weights() |
| |
| def forward( |
| self, |
| pixel_values=None, |
| original_size=None, |
| labels=None, |
| output_hidden_states=None, |
| return_dict=None, |
| ): |
| """ |
| Forward pass of the model. |
| |
| Args: |
| pixel_values (`torch.FloatTensor` of shape `(batch_size, channels, height, width)`): |
| Pixel values resized to 640×640 by the image processor. |
| original_size (`Tuple[int, int]`, *optional*): |
| ``(height, width)`` of the **original** image before preprocessing. |
| When supplied, the returned boxes are automatically scaled from |
| 640×640 model-input space to original image pixel coordinates so |
| the caller never needs to compute ``sx = orig_w / 640`` manually. |
| All images in the batch are assumed to share the same original size. |
| labels (`List[Dict]`, *optional*): |
| Labels for computing the detection loss. |
| output_hidden_states (`bool`, *optional*): |
| Whether or not to return the hidden states of all layers. |
| return_dict (`bool`, *optional*): |
| Whether or not to return a ModelOutput instead of a plain tuple. |
| |
| Returns: |
| `DetectionOutput` or `tuple`: |
| Boxes are in 640×640 space by default, or in original image space |
| when ``original_size`` is provided. |
| """ |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
| |
| |
| if pixel_values is None: |
| raise ValueError("You have to specify pixel_values") |
| |
| batch_size, channels, height, width = pixel_values.shape |
| |
| |
| backbone_features = self.backbone(pixel_values) |
| |
| |
| neck_features = self.neck(backbone_features) |
| |
| |
| cls_scores, bbox_preds = self.bbox_head(neck_features) |
| |
| if labels is not None: |
| |
| loss = torch.tensor(0.0, device=pixel_values.device) |
| if return_dict: |
| return DetectionOutput(loss=loss) |
| else: |
| return (loss,) |
| |
| |
| |
| batch_img_metas = [{ |
| 'img_shape': (height, width, 3), |
| 'scale_factor': [1.0, 1.0, 1.0, 1.0] |
| } for _ in range(batch_size)] |
| |
| |
| results = self.bbox_head.predict( |
| cls_scores=cls_scores, |
| bbox_preds=bbox_preds, |
| batch_img_metas=batch_img_metas, |
| rescale=False, |
| with_nms=True, |
| score_thr=self.config.score_threshold, |
| nms_iou_threshold=self.config.nms_threshold, |
| max_per_img=self.config.max_detections |
| ) |
| |
| |
| if original_size is not None: |
| orig_h, orig_w = original_size |
| sx = orig_w / width |
| sy = orig_h / height |
| scaled_boxes = results.boxes.clone() |
| scaled_boxes[..., 0] *= sx |
| scaled_boxes[..., 2] *= sx |
| scaled_boxes[..., 1] *= sy |
| scaled_boxes[..., 3] *= sy |
| results = DetectionOutput( |
| boxes=scaled_boxes, |
| scores=results.scores, |
| labels=results.labels, |
| ) |
|
|
| if return_dict: |
| return results |
| else: |
| |
| return (results.boxes, results.scores, results.labels) |
|
|
|
|