rtmdet-tiny / modeling_rtmdet.py
akore's picture
feat: add original_size param to forward() — boxes auto-scaled to image space
54e05fc verified
from typing import List, Optional, Tuple, Union, Sequence, Dict
from dataclasses import dataclass
import inspect
from functools import partial
import warnings
import math
import torch
import torchvision
import torch.nn as nn
from torch import Tensor
import torch.nn.functional as F
from torch.nn.modules.batchnorm import _BatchNorm, SyncBatchNorm
from transformers.modeling_outputs import ModelOutput
from transformers.modeling_utils import PreTrainedModel
from transformers.utils import logging
from .configuration_rtmdet import RTMDetConfig
logger = logging.get_logger(__name__)
@dataclass
class DetectionOutput(ModelOutput):
"""
Output type for object detection models.
Args:
boxes (`torch.FloatTensor` of shape `(batch_size, num_boxes, 4)`):
Detection boxes in format [x1, y1, x2, y2]. Coordinates are in
model-input space (640×640) by default, or in original image pixel
space when ``original_size`` was passed to ``forward()``.
scores (`torch.FloatTensor` of shape `(batch_size, num_boxes)`):
Detection confidence scores.
labels (`torch.LongTensor` of shape `(batch_size, num_boxes)`):
Detection class indices.
loss (`torch.FloatTensor`, *optional*):
Loss value if training.
"""
boxes: torch.FloatTensor = None
scores: torch.FloatTensor = None
labels: torch.LongTensor = None
loss: Optional[torch.FloatTensor] = None
# Replace MODELS registry with direct class mappings
ACTIVATION_LAYERS = {
'ReLU': nn.ReLU,
'LeakyReLU': nn.LeakyReLU,
'PReLU': nn.PReLU,
'SiLU': nn.SiLU,
'Sigmoid': nn.Sigmoid,
'Tanh': nn.Tanh,
'GELU': nn.GELU,
'Swish': nn.SiLU, # Swish is equivalent to SiLU
'Hardsigmoid': nn.Hardsigmoid,
'HSigmoid': nn.Hardsigmoid
}
# Simple Config Type replacement
ConfigType = Dict
OptConfigType = Optional[Dict]
OptMultiConfig = Optional[Union[Dict, List[Dict]]]
def build_activation_layer(cfg: Dict) -> nn.Module:
"""Build activation layer.
Args:
cfg (dict): The activation layer config, which should contain:
- type (str): Layer type.
- layer args: Args needed to instantiate an activation layer.
Returns:
nn.Module: Created activation layer.
"""
if not isinstance(cfg, dict):
raise TypeError('cfg must be a dict')
if 'type' not in cfg:
raise KeyError('the cfg dict must contain the key "type"')
cfg_ = cfg.copy()
layer_type = cfg_.pop('type')
if layer_type not in ACTIVATION_LAYERS:
raise KeyError(f'Unrecognized activation type {layer_type}')
activation = ACTIVATION_LAYERS[layer_type]
return activation(**cfg_)
def kaiming_init(module,
a=0,
mode='fan_out',
nonlinearity='relu',
bias=0,
distribution='normal'):
assert distribution in ['uniform', 'normal']
if hasattr(module, 'weight') and module.weight is not None:
if distribution == 'uniform':
nn.init.kaiming_uniform_(
module.weight, a=a, mode=mode, nonlinearity=nonlinearity)
else:
nn.init.kaiming_normal_(
module.weight, a=a, mode=mode, nonlinearity=nonlinearity)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
def constant_init(module, val, bias=0):
if hasattr(module, 'weight') and module.weight is not None:
nn.init.constant_(module.weight, val)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
class _InstanceNorm(nn.modules.instancenorm._InstanceNorm):
"""Instance Normalization Base Class."""
pass
# Custom implementation of methods with asterisks that couldn't be included in the original code
# These methods need to be renamed without asterisks in actual implementation
def infer_abbr(class_type):
"""Infer abbreviation from the class name."""
if not inspect.isclass(class_type):
raise TypeError(
f'class_type must be a type, but got {type(class_type)}')
if hasattr(class_type, '_abbr_'):
return class_type._abbr_
if issubclass(class_type, _InstanceNorm): # IN is a subclass of BN
return 'in'
elif issubclass(class_type, _BatchNorm):
return 'bn'
elif issubclass(class_type, nn.GroupNorm):
return 'gn'
elif issubclass(class_type, nn.LayerNorm):
return 'ln'
else:
class_name = class_type.__name__.lower()
if 'batch' in class_name:
return 'bn'
elif 'group' in class_name:
return 'gn'
elif 'layer' in class_name:
return 'ln'
elif 'instance' in class_name:
return 'in'
else:
return 'norm_layer'
# Create mapping from strings to layer classes
NORM_LAYERS = {
'BN': nn.BatchNorm2d,
'BN1d': nn.BatchNorm1d,
'BN2d': nn.BatchNorm2d,
'BN3d': nn.BatchNorm3d,
'SyncBN': SyncBatchNorm,
'GN': nn.GroupNorm,
'LN': nn.LayerNorm,
'IN': nn.InstanceNorm2d,
'IN1d': nn.InstanceNorm1d,
'IN2d': nn.InstanceNorm2d,
'IN3d': nn.InstanceNorm3d
}
CONV_LAYERS = {
'Conv1d': nn.Conv1d,
'Conv2d': nn.Conv2d,
'Conv3d': nn.Conv3d,
'Conv': nn.Conv2d
}
PADDING_LAYERS = {
'zero': nn.ZeroPad2d,
'reflect': nn.ReflectionPad2d,
'replicate': nn.ReplicationPad2d
}
def build_norm_layer(cfg: Dict,
num_features: int,
postfix: Union[int, str] = '') -> Tuple[str, nn.Module]:
"""Build normalization layer."""
if not isinstance(cfg, dict):
raise TypeError('cfg must be a dict')
if 'type' not in cfg:
raise KeyError('the cfg dict must contain the key "type"')
cfg_ = cfg.copy()
layer_type = cfg_.pop('type')
if layer_type not in NORM_LAYERS:
raise KeyError(f'Unrecognized norm type {layer_type}')
norm_layer = NORM_LAYERS[layer_type]
abbr = infer_abbr(norm_layer)
assert isinstance(postfix, (int, str))
name = abbr + str(postfix)
requires_grad = cfg_.pop('requires_grad', True)
cfg_.setdefault('eps', 1e-5)
if norm_layer is not nn.GroupNorm:
layer = norm_layer(num_features, **cfg_)
if layer_type == 'SyncBN' and hasattr(layer, '_specify_ddp_gpu_num'):
layer._specify_ddp_gpu_num(1)
else:
assert 'num_groups' in cfg_
layer = norm_layer(num_channels=num_features, **cfg_)
for param in layer.parameters():
param.requires_grad = requires_grad
return name, layer
def build_conv_layer(cfg: Optional[Dict], *args, **kwargs) -> nn.Module:
"""Build convolution layer."""
if cfg is None:
cfg_ = dict(type='Conv2d')
else:
if not isinstance(cfg, dict):
raise TypeError('cfg must be a dict')
if 'type' not in cfg:
raise KeyError('the cfg dict must contain the key "type"')
cfg_ = cfg.copy()
layer_type = cfg_.pop('type')
if layer_type not in CONV_LAYERS:
raise KeyError(f'Unrecognized conv type {layer_type}')
conv_layer = CONV_LAYERS[layer_type]
layer = conv_layer(*args, **kwargs, **cfg_)
return layer
def build_padding_layer(cfg: Dict, *args, **kwargs) -> nn.Module:
"""Build padding layer."""
if not isinstance(cfg, dict):
raise TypeError('cfg must be a dict')
if 'type' not in cfg:
raise KeyError('the cfg dict must contain the key "type"')
cfg_ = cfg.copy()
padding_type = cfg_.pop('type')
if padding_type not in PADDING_LAYERS:
raise KeyError(f'Unrecognized padding type {padding_type}')
padding_layer = PADDING_LAYERS[padding_type]
layer = padding_layer(*args, **kwargs, **cfg_)
return layer
def efficient_conv_bn_eval_forward(bn: _BatchNorm,
conv: nn.modules.conv._ConvNd,
x: torch.Tensor):
"""
Implementation based on https://arxiv.org/abs/2305.11624
"Tune-Mode ConvBN Blocks For Efficient Transfer Learning"
It leverages the associative law between convolution and affine transform,
i.e., normalize (weight conv feature) = (normalize weight) conv feature.
It works for Eval mode of ConvBN blocks during validation, and can be used
for training as well. It reduces memory and computation cost.
Args:
bn (_BatchNorm): a BatchNorm module.
conv (nn._ConvNd): a conv module
x (torch.Tensor): Input feature map.
"""
# These lines of code are designed to deal with various cases
# like bn without affine transform, and conv without bias
weight_on_the_fly = conv.weight
if conv.bias is not None:
bias_on_the_fly = conv.bias
else:
bias_on_the_fly = torch.zeros_like(bn.running_var)
if bn.weight is not None:
bn_weight = bn.weight
else:
bn_weight = torch.ones_like(bn.running_var)
if bn.bias is not None:
bn_bias = bn.bias
else:
bn_bias = torch.zeros_like(bn.running_var)
# shape of [C_out, 1, 1, 1] in Conv2d
weight_coeff = torch.rsqrt(bn.running_var +
bn.eps).reshape([-1] + [1] *
(len(conv.weight.shape) - 1))
# shape of [C_out, 1, 1, 1] in Conv2d
coefff_on_the_fly = bn_weight.view_as(weight_coeff) * weight_coeff
# shape of [C_out, C_in, k, k] in Conv2d
weight_on_the_fly = weight_on_the_fly * coefff_on_the_fly
# shape of [C_out] in Conv2d
bias_on_the_fly = bn_bias + coefff_on_the_fly.flatten() *\
(bias_on_the_fly - bn.running_mean)
return conv._conv_forward(x, weight_on_the_fly, bias_on_the_fly)
class ConvModule(nn.Module):
"""A conv block that bundles conv/norm/activation layers."""
_abbr_ = 'conv_block'
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: Union[int, Tuple[int, int]],
stride: Union[int, Tuple[int, int]] = 1,
padding: Union[int, Tuple[int, int]] = 0,
dilation: Union[int, Tuple[int, int]] = 1,
groups: int = 1,
bias: Union[bool, str] = 'auto',
conv_cfg: Optional[Dict] = None,
norm_cfg: Optional[Dict] = None,
act_cfg: Optional[Dict] = dict(type='ReLU'),
inplace: bool = True,
with_spectral_norm: bool = False,
padding_mode: str = 'zeros',
order: tuple = ('conv', 'norm', 'act'),
efficient_conv_bn_eval: bool = False):
super().__init__()
assert conv_cfg is None or isinstance(conv_cfg, dict)
assert norm_cfg is None or isinstance(norm_cfg, dict)
assert act_cfg is None or isinstance(act_cfg, dict)
official_padding_mode = ['zeros', 'circular']
self.conv_cfg = conv_cfg
self.norm_cfg = norm_cfg
self.act_cfg = act_cfg
self.inplace = inplace
self.with_spectral_norm = with_spectral_norm
self.with_explicit_padding = padding_mode not in official_padding_mode
self.order = order
assert isinstance(self.order, tuple) and len(self.order) == 3
assert set(order) == {'conv', 'norm', 'act'}
self.with_norm = norm_cfg is not None
self.with_activation = act_cfg is not None
# if the conv layer is before a norm layer, bias is unnecessary.
if bias == 'auto':
bias = not self.with_norm
self.with_bias = bias
if self.with_explicit_padding:
pad_cfg = dict(type=padding_mode)
self.padding_layer = build_padding_layer(pad_cfg, padding)
# reset padding to 0 for conv module
conv_padding = 0 if self.with_explicit_padding else padding
# build convolution layer
self.conv = build_conv_layer(
conv_cfg,
in_channels,
out_channels,
kernel_size,
stride=stride,
padding=conv_padding,
dilation=dilation,
groups=groups,
bias=bias)
# export the attributes of self.conv to a higher level for convenience
self.in_channels = self.conv.in_channels
self.out_channels = self.conv.out_channels
self.kernel_size = self.conv.kernel_size
self.stride = self.conv.stride
self.padding = padding
self.dilation = self.conv.dilation
self.transposed = self.conv.transposed
self.output_padding = self.conv.output_padding
self.groups = self.conv.groups
if self.with_spectral_norm:
self.conv = nn.utils.spectral_norm(self.conv)
# build normalization layers
if self.with_norm:
# norm layer is after conv layer
if order.index('norm') > order.index('conv'):
norm_channels = out_channels
else:
norm_channels = in_channels
self.norm_name, norm = build_norm_layer(
norm_cfg, norm_channels) # type: ignore
self.add_module(self.norm_name, norm)
if self.with_bias:
if isinstance(norm, (_BatchNorm, _InstanceNorm)):
warnings.warn(
'Unnecessary conv bias before batch/instance norm')
else:
self.norm_name = None # type: ignore
self.turn_on_efficient_conv_bn_eval(efficient_conv_bn_eval)
# build activation layer
if self.with_activation:
act_cfg_ = act_cfg.copy() # type: ignore
# nn.Tanh has no 'inplace' argument
if act_cfg_['type'] not in [
'Tanh', 'PReLU', 'Sigmoid', 'HSigmoid', 'Swish', 'GELU'
]:
act_cfg_.setdefault('inplace', inplace)
self.activate = build_activation_layer(act_cfg_)
# Use msra init by default
self.init_weights()
@property
def norm(self):
if self.norm_name:
return getattr(self, self.norm_name)
else:
return None
def init_weights(self):
if not hasattr(self.conv, 'init_weights'):
if self.with_activation and self.act_cfg['type'] == 'LeakyReLU':
nonlinearity = 'leaky_relu'
a = self.act_cfg.get('negative_slope', 0.01)
else:
nonlinearity = 'relu'
a = 0
kaiming_init(self.conv, a=a, nonlinearity=nonlinearity)
if self.with_norm:
constant_init(self.norm, 1, bias=0)
def forward(self,
x: torch.Tensor,
activate: bool = True,
norm: bool = True) -> torch.Tensor:
layer_index = 0
while layer_index < len(self.order):
layer = self.order[layer_index]
if layer == 'conv':
if self.with_explicit_padding:
x = self.padding_layer(x)
# if the next operation is norm and we have a norm layer in
# eval mode and we have enabled `efficient_conv_bn_eval` for
# the conv operator, then activate the optimized forward and
# skip the next norm operator since it has been fused
if layer_index + 1 < len(self.order) and \
self.order[layer_index + 1] == 'norm' and norm and \
self.with_norm and not self.norm.training and \
self.efficient_conv_bn_eval_forward is not None:
self.conv.forward = partial(
self.efficient_conv_bn_eval_forward, self.norm,
self.conv)
layer_index += 1
x = self.conv(x)
del self.conv.forward
else:
x = self.conv(x)
elif layer == 'norm' and norm and self.with_norm:
x = self.norm(x)
elif layer == 'act' and activate and self.with_activation:
x = self.activate(x)
layer_index += 1
return x
def turn_on_efficient_conv_bn_eval(self, efficient_conv_bn_eval=True):
# efficient_conv_bn_eval works for conv + bn
# with `track_running_stats` option
if efficient_conv_bn_eval and self.norm \
and isinstance(self.norm, _BatchNorm) \
and self.norm.track_running_stats:
self.efficient_conv_bn_eval_forward = efficient_conv_bn_eval_forward # noqa: E501
else:
self.efficient_conv_bn_eval_forward = None # type: ignore
@staticmethod
def create_from_conv_bn(conv: torch.nn.modules.conv._ConvNd,
bn: torch.nn.modules.batchnorm._BatchNorm,
efficient_conv_bn_eval=True) -> 'ConvModule':
"""Create a ConvModule from a conv and a bn module."""
self = ConvModule.__new__(ConvModule)
super(ConvModule, self).__init__()
self.conv_cfg = None
self.norm_cfg = None
self.act_cfg = None
self.inplace = False
self.with_spectral_norm = False
self.with_explicit_padding = False
self.order = ('conv', 'norm', 'act')
self.with_norm = True
self.with_activation = False
self.with_bias = conv.bias is not None
# build convolution layer
self.conv = conv
# export the attributes of self.conv to a higher level for convenience
self.in_channels = self.conv.in_channels
self.out_channels = self.conv.out_channels
self.kernel_size = self.conv.kernel_size
self.stride = self.conv.stride
self.padding = self.conv.padding
self.dilation = self.conv.dilation
self.transposed = self.conv.transposed
self.output_padding = self.conv.output_padding
self.groups = self.conv.groups
# build normalization layers
self.norm_name, norm = 'bn', bn
self.add_module(self.norm_name, norm)
self.turn_on_efficient_conv_bn_eval(efficient_conv_bn_eval)
return self
class DepthwiseSeparableConvModule(nn.Module):
"""Depthwise separable convolution module."""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: Union[int, Tuple[int, int]],
stride: Union[int, Tuple[int, int]] = 1,
padding: Union[int, Tuple[int, int]] = 0,
dilation: Union[int, Tuple[int, int]] = 1,
norm_cfg: Optional[Dict] = None,
act_cfg: Dict = dict(type='ReLU'),
dw_norm_cfg: Union[Dict, str] = 'default',
dw_act_cfg: Union[Dict, str] = 'default',
pw_norm_cfg: Union[Dict, str] = 'default',
pw_act_cfg: Union[Dict, str] = 'default',
**kwargs):
super().__init__()
assert 'groups' not in kwargs, 'groups should not be specified'
# if norm/activation config of depthwise/pointwise ConvModule is not
# specified, use default config.
dw_norm_cfg = dw_norm_cfg if dw_norm_cfg != 'default' else norm_cfg # type: ignore # noqa E501
dw_act_cfg = dw_act_cfg if dw_act_cfg != 'default' else act_cfg
pw_norm_cfg = pw_norm_cfg if pw_norm_cfg != 'default' else norm_cfg # type: ignore # noqa E501
pw_act_cfg = pw_act_cfg if pw_act_cfg != 'default' else act_cfg
# depthwise convolution
self.depthwise_conv = ConvModule(
in_channels,
in_channels,
kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=in_channels,
norm_cfg=dw_norm_cfg, # type: ignore
act_cfg=dw_act_cfg, # type: ignore
**kwargs)
self.pointwise_conv = ConvModule(
in_channels,
out_channels,
1,
norm_cfg=pw_norm_cfg, # type: ignore
act_cfg=pw_act_cfg, # type: ignore
**kwargs)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.depthwise_conv(x)
x = self.pointwise_conv(x)
return x
class SPPBottleneck(nn.Module):
"""Spatial pyramid pooling layer used in YOLOv3-SPP."""
def __init__(self,
in_channels,
out_channels,
kernel_sizes=(5, 9, 13),
conv_cfg=None,
norm_cfg=dict(type='BN', momentum=0.03, eps=0.001),
act_cfg=dict(type='Swish'),
init_cfg=None):
super().__init__()
mid_channels = in_channels // 2
self.conv1 = ConvModule(
in_channels,
mid_channels,
1,
stride=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
self.poolings = nn.ModuleList([
nn.MaxPool2d(kernel_size=ks, stride=1, padding=ks // 2)
for ks in kernel_sizes
])
conv2_channels = mid_channels * (len(kernel_sizes) + 1)
self.conv2 = ConvModule(
conv2_channels,
out_channels,
1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
def forward(self, x):
x = self.conv1(x)
with torch.amp.autocast(enabled=False, device_type=x.device.type):
x = torch.cat(
[x] + [pooling(x) for pooling in self.poolings], dim=1)
x = self.conv2(x)
return x
class DarknetBottleneck(nn.Module):
"""The basic bottleneck block used in Darknet."""
def __init__(self,
in_channels: int,
out_channels: int,
expansion: float = 0.5,
add_identity: bool = True,
use_depthwise: bool = False,
conv_cfg: OptConfigType = None,
norm_cfg: ConfigType = dict(
type='BN', momentum=0.03, eps=0.001),
act_cfg: ConfigType = dict(type='Swish'),
init_cfg: OptMultiConfig = None) -> None:
super().__init__()
hidden_channels = int(out_channels * expansion)
conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule
self.conv1 = ConvModule(
in_channels,
hidden_channels,
1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
self.conv2 = conv(
hidden_channels,
out_channels,
3,
stride=1,
padding=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
self.add_identity = \
add_identity and in_channels == out_channels
def forward(self, x: Tensor) -> Tensor:
"""Forward function."""
identity = x
out = self.conv1(x)
out = self.conv2(out)
if self.add_identity:
return out + identity
else:
return out
class CSPNeXtBlock(nn.Module):
"""The basic bottleneck block used in CSPNeXt."""
def __init__(self,
in_channels: int,
out_channels: int,
expansion: float = 0.5,
add_identity: bool = True,
use_depthwise: bool = False,
kernel_size: int = 5,
conv_cfg: OptConfigType = None,
norm_cfg: ConfigType = dict(
type='BN', momentum=0.03, eps=0.001),
act_cfg: ConfigType = dict(type='SiLU'),
init_cfg: OptMultiConfig = None) -> None:
super().__init__()
hidden_channels = int(out_channels * expansion)
conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule
self.conv1 = conv(
in_channels,
hidden_channels,
3,
stride=1,
padding=1,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
self.conv2 = DepthwiseSeparableConvModule(
hidden_channels,
out_channels,
kernel_size,
stride=1,
padding=kernel_size // 2,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
self.add_identity = \
add_identity and in_channels == out_channels
def forward(self, x: Tensor) -> Tensor:
"""Forward function."""
identity = x
out = self.conv1(x)
out = self.conv2(out)
if self.add_identity:
return out + identity
else:
return out
class ChannelAttention(nn.Module):
"""Channel attention Module."""
def __init__(self, channels: int, init_cfg: OptMultiConfig = None) -> None:
super().__init__()
self.global_avgpool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Conv2d(channels, channels, 1, 1, 0, bias=True)
self.act = nn.Hardsigmoid(inplace=True)
def forward(self, x: Tensor) -> Tensor:
"""Forward function for ChannelAttention."""
with torch.amp.autocast(enabled=False, device_type=x.device.type):
out = self.global_avgpool(x)
out = self.fc(out)
out = self.act(out)
return x * out
class CSPLayer(nn.Module):
"""Cross Stage Partial Layer.
Args:
in_channels (int): The input channels of the CSP layer.
out_channels (int): The output channels of the CSP layer.
expand_ratio (float): Ratio to adjust the number of channels of the
hidden layer. Defaults to 0.5.
num_blocks (int): Number of blocks. Defaults to 1.
add_identity (bool): Whether to add identity in blocks.
Defaults to True.
use_cspnext_block (bool): Whether to use CSPNeXt block.
Defaults to False.
use_depthwise (bool): Whether to use depthwise separable convolution in
blocks. Defaults to False.
channel_attention (bool): Whether to add channel attention in each
stage. Defaults to True.
conv_cfg (dict, optional): Config dict for convolution layer.
Defaults to None, which means using conv2d.
norm_cfg (dict): Config dict for normalization layer.
Defaults to dict(type='BN')
act_cfg (dict): Config dict for activation layer.
Defaults to dict(type='Swish')
"""
def __init__(self,
in_channels: int,
out_channels: int,
expand_ratio: float = 0.5,
num_blocks: int = 1,
add_identity: bool = True,
use_depthwise: bool = False,
use_cspnext_block: bool = False,
channel_attention: bool = False,
conv_cfg: OptConfigType = None,
norm_cfg: ConfigType = dict(
type='BN', momentum=0.03, eps=0.001),
act_cfg: ConfigType = dict(type='Swish'),
init_cfg: OptMultiConfig = None) -> None:
super().__init__()
block = CSPNeXtBlock if use_cspnext_block else DarknetBottleneck
mid_channels = int(out_channels * expand_ratio)
self.channel_attention = channel_attention
self.main_conv = ConvModule(
in_channels,
mid_channels,
1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
self.short_conv = ConvModule(
in_channels,
mid_channels,
1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
self.final_conv = ConvModule(
2 * mid_channels,
out_channels,
1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
self.blocks = nn.Sequential(*[
block(
mid_channels,
mid_channels,
1.0,
add_identity,
use_depthwise,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg) for _ in range(num_blocks)
])
if channel_attention:
self.attention = ChannelAttention(2 * mid_channels)
def forward(self, x: Tensor) -> Tensor:
"""Forward function."""
x_short = self.short_conv(x)
x_main = self.main_conv(x)
x_main = self.blocks(x_main)
x_final = torch.cat((x_main, x_short), dim=1)
if self.channel_attention:
x_final = self.attention(x_final)
return self.final_conv(x_final)
class CSPNeXt(nn.Module):
"""CSPNeXt backbone used in RTMDet.
This is a standalone implementation without requiring the mmdet registry.
Args:
arch (str): Architecture of CSPNeXt, from {P5, P6}.
Defaults to P5.
expand_ratio (float): Ratio to adjust the number of channels of the
hidden layer. Defaults to 0.5.
deepen_factor (float): Depth multiplier, multiply number of
blocks in CSP layer by this amount. Defaults to 1.0.
widen_factor (float): Width multiplier, multiply number of
channels in each layer by this amount. Defaults to 1.0.
out_indices (Sequence[int]): Output from which stages.
Defaults to (2, 3, 4).
frozen_stages (int): Stages to be frozen (stop grad and set eval
mode). -1 means not freezing any parameters. Defaults to -1.
use_depthwise (bool): Whether to use depthwise separable convolution.
Defaults to False.
arch_ovewrite (list): Overwrite default arch settings.
Defaults to None.
spp_kernel_sizes: (tuple[int]): Sequential of kernel sizes of SPP
layers. Defaults to (5, 9, 13).
channel_attention (bool): Whether to add channel attention in each
stage. Defaults to True.
conv_cfg (:obj:`ConfigDict` or dict, optional): Config dict for
convolution layer. Defaults to None.
norm_cfg (:obj:`ConfigDict` or dict): Dictionary to construct and
config norm layer. Defaults to dict(type='BN', requires_grad=True).
act_cfg (:obj:`ConfigDict` or dict): Config dict for activation layer.
Defaults to dict(type='SiLU').
norm_eval (bool): Whether to set norm layers to eval mode, namely,
freeze running stats (mean and var). Note: Effect on Batch Norm
and its variants only.
"""
# From left to right:
# in_channels, out_channels, num_blocks, add_identity, use_spp
arch_settings = {
'P5': [[64, 128, 3, True, False], [128, 256, 6, True, False],
[256, 512, 6, True, False], [512, 1024, 3, False, True]],
'P6': [[64, 128, 3, True, False], [128, 256, 6, True, False],
[256, 512, 6, True, False], [512, 768, 3, True, False],
[768, 1024, 3, False, True]]
}
def __init__(
self,
arch: str = 'P5',
deepen_factor: float = 1.0,
widen_factor: float = 1.0,
out_indices: Sequence[int] = (2, 3, 4),
frozen_stages: int = -1,
use_depthwise: bool = False,
expand_ratio: float = 0.5,
arch_ovewrite: dict = None,
spp_kernel_sizes: Sequence[int] = (5, 9, 13),
channel_attention: bool = True,
conv_cfg: OptConfigType = None,
norm_cfg: ConfigType = dict(type='BN', momentum=0.03, eps=0.001),
act_cfg: ConfigType = dict(type='SiLU'),
norm_eval: bool = False,
init_cfg: OptMultiConfig = dict(
type='Kaiming',
layer='Conv2d',
a=math.sqrt(5),
distribution='uniform',
mode='fan_in',
nonlinearity='leaky_relu')
) -> None:
super().__init__()
arch_setting = self.arch_settings[arch]
if arch_ovewrite:
arch_setting = arch_ovewrite
assert set(out_indices).issubset(
i for i in range(len(arch_setting) + 1))
if frozen_stages not in range(-1, len(arch_setting) + 1):
raise ValueError('frozen_stages must be in range(-1, '
'len(arch_setting) + 1). But received '
f'{frozen_stages}')
self.out_indices = out_indices
self.frozen_stages = frozen_stages
self.use_depthwise = use_depthwise
self.norm_eval = norm_eval
conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule
self.stem = nn.Sequential(
ConvModule(
3,
int(arch_setting[0][0] * widen_factor // 2),
3,
padding=1,
stride=2,
norm_cfg=norm_cfg,
act_cfg=act_cfg),
ConvModule(
int(arch_setting[0][0] * widen_factor // 2),
int(arch_setting[0][0] * widen_factor // 2),
3,
padding=1,
stride=1,
norm_cfg=norm_cfg,
act_cfg=act_cfg),
ConvModule(
int(arch_setting[0][0] * widen_factor // 2),
int(arch_setting[0][0] * widen_factor),
3,
padding=1,
stride=1,
norm_cfg=norm_cfg,
act_cfg=act_cfg))
self.layers = ['stem']
for i, (in_channels, out_channels, num_blocks, add_identity,
use_spp) in enumerate(arch_setting):
in_channels = int(in_channels * widen_factor)
out_channels = int(out_channels * widen_factor)
num_blocks = max(round(num_blocks * deepen_factor), 1)
stage = []
conv_layer = conv(
in_channels,
out_channels,
3,
stride=2,
padding=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
stage.append(conv_layer)
if use_spp:
spp = SPPBottleneck(
out_channels,
out_channels,
kernel_sizes=spp_kernel_sizes,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
stage.append(spp)
csp_layer = CSPLayer(
out_channels,
out_channels,
num_blocks=num_blocks,
add_identity=add_identity,
use_depthwise=use_depthwise,
use_cspnext_block=True,
expand_ratio=expand_ratio,
channel_attention=channel_attention,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
stage.append(csp_layer)
self.add_module(f'stage{i + 1}', nn.Sequential(*stage))
self.layers.append(f'stage{i + 1}')
def freeze_stages(self) -> None:
"""Freeze stages parameters."""
if self.frozen_stages >= 0:
for i in range(self.frozen_stages + 1):
m = getattr(self, self.layers[i])
m.eval()
for param in m.parameters():
param.requires_grad = False
def train(self, mode=True) -> None:
"""Convert the model into training mode while keeping normalization layer
frozen."""
super().train(mode)
self.freeze_stages()
if mode and self.norm_eval:
for m in self.modules():
if isinstance(m, _BatchNorm):
m.eval()
def forward(self, x: Tuple[Tensor, ...]) -> Tuple[Tensor, ...]:
outs = []
for i, layer_name in enumerate(self.layers):
layer = getattr(self, layer_name)
x = layer(x)
if i in self.out_indices:
outs.append(x)
return tuple(outs)
class CSPNeXtPAFPN(nn.Module):
"""Path Aggregation Network with CSPNeXt blocks.
This is a standalone implementation that works with the CSPNeXt backbone.
Args:
in_channels (Sequence[int]): Number of input channels per scale.
out_channels (int): Number of output channels (used at each scale)
out_indices (Sequence[int]): Output from which stages.
num_csp_blocks (int): Number of bottlenecks in CSPLayer.
Defaults to 3.
use_depthwise (bool): Whether to use depthwise separable convolution in
blocks. Defaults to False.
expand_ratio (float): Ratio to adjust the number of channels of the
hidden layer. Default: 0.5
upsample_cfg (dict): Config dict for interpolate layer.
Default: `dict(scale_factor=2, mode='nearest')`
conv_cfg (dict, optional): Config dict for convolution layer.
Default: None, which means using conv2d.
norm_cfg (dict): Config dict for normalization layer.
Default: dict(type='BN')
act_cfg (dict): Config dict for activation layer.
Default: dict(type='Swish')
"""
def __init__(
self,
in_channels: Sequence[int],
out_channels: int,
out_indices=(0, 1, 2),
num_csp_blocks: int = 3,
use_depthwise: bool = False,
expand_ratio: float = 0.5,
upsample_cfg: ConfigType = dict(scale_factor=2, mode='nearest'),
conv_cfg: OptConfigType = None,
norm_cfg: ConfigType = dict(type='BN', momentum=0.03, eps=0.001),
act_cfg: ConfigType = dict(type='Swish'),
init_cfg: OptMultiConfig = dict(
type='Kaiming',
layer='Conv2d',
a=math.sqrt(5),
distribution='uniform',
mode='fan_in',
nonlinearity='leaky_relu')
) -> None:
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.out_indices = out_indices
conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule
# build top-down blocks
self.upsample = nn.Upsample(**upsample_cfg)
self.reduce_layers = nn.ModuleList()
self.top_down_blocks = nn.ModuleList()
for idx in range(len(in_channels) - 1, 0, -1):
self.reduce_layers.append(
ConvModule(
in_channels[idx],
in_channels[idx - 1],
1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg))
self.top_down_blocks.append(
CSPLayer(
in_channels[idx - 1] * 2,
in_channels[idx - 1],
num_blocks=num_csp_blocks,
add_identity=False,
use_depthwise=use_depthwise,
use_cspnext_block=True,
expand_ratio=expand_ratio,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg))
# build bottom-up blocks
self.downsamples = nn.ModuleList()
self.bottom_up_blocks = nn.ModuleList()
for idx in range(len(in_channels) - 1):
self.downsamples.append(
conv(
in_channels[idx],
in_channels[idx],
3,
stride=2,
padding=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg))
self.bottom_up_blocks.append(
CSPLayer(
in_channels[idx] * 2,
in_channels[idx + 1],
num_blocks=num_csp_blocks,
add_identity=False,
use_depthwise=use_depthwise,
use_cspnext_block=True,
expand_ratio=expand_ratio,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg))
if self.out_channels is not None:
self.out_convs = nn.ModuleList()
for i in range(len(in_channels)):
self.out_convs.append(
conv(
in_channels[i],
out_channels,
3,
padding=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg))
def forward(self, inputs: Tuple[Tensor, ...]) -> Tuple[Tensor, ...]:
"""
Args:
inputs (tuple[Tensor]): input features.
Returns:
tuple[Tensor]: YOLOXPAFPN features.
"""
assert len(inputs) == len(self.in_channels)
# top-down path
inner_outs = [inputs[-1]]
for idx in range(len(self.in_channels) - 1, 0, -1):
feat_high = inner_outs[0]
feat_low = inputs[idx - 1]
feat_high = self.reduce_layers[len(self.in_channels) - 1 - idx](
feat_high)
inner_outs[0] = feat_high
upsample_feat = self.upsample(feat_high)
inner_out = self.top_down_blocks[len(self.in_channels) - 1 - idx](
torch.cat([upsample_feat, feat_low], 1))
inner_outs.insert(0, inner_out)
# bottom-up path
outs = [inner_outs[0]]
for idx in range(len(self.in_channels) - 1):
feat_low = outs[-1]
feat_high = inner_outs[idx + 1]
downsample_feat = self.downsamples[idx](feat_low)
out = self.bottom_up_blocks[idx](
torch.cat([downsample_feat, feat_high], 1))
outs.append(out)
if self.out_channels is not None:
# out convs
for idx in range(len(outs)):
outs[idx] = self.out_convs[idx](outs[idx])
return tuple([outs[i] for i in self.out_indices])
class MlvlPointGenerator:
"""Standard points generator for multi-level feature maps."""
def __init__(
self,
strides,
offset: float = 0.5
) -> None:
if not isinstance(strides, (list, tuple)):
strides = [strides]
self.strides = strides
self.offset = offset
def grid_priors(
self,
featmap_sizes,
dtype=torch.float32,
device='cuda',
with_stride=False
):
"""Generate grid points of multiple feature levels."""
num_levels = len(featmap_sizes)
multi_level_priors = []
for i in range(num_levels):
priors = self.single_level_grid_priors(
featmap_sizes[i],
level_idx=i,
dtype=dtype,
device=device,
with_stride=with_stride)
multi_level_priors.append(priors)
return multi_level_priors
def single_level_grid_priors(
self,
featmap_size,
level_idx,
dtype=torch.float32,
device='cuda',
with_stride=False
):
"""Generate grid points for a single feature level."""
feat_h, feat_w = featmap_size
stride = self.strides[level_idx]
# Create grid coordinates
shift_x = (torch.arange(0, feat_w, device=device) + self.offset) * stride
shift_y = (torch.arange(0, feat_h, device=device) + self.offset) * stride
shift_x = shift_x.to(dtype)
shift_y = shift_y.to(dtype)
# Create grid
shift_yy, shift_xx = torch.meshgrid(shift_y, shift_x, indexing="ij")
shift_xx = shift_xx.reshape(-1)
shift_yy = shift_yy.reshape(-1)
if not with_stride:
shifts = torch.stack([shift_xx, shift_yy], dim=-1)
else:
# Include stride information
stride_tensor = torch.tensor(stride, dtype=dtype, device=device)
stride_xx = torch.full_like(shift_xx, stride_tensor)
stride_yy = torch.full_like(shift_yy, stride_tensor)
shifts = torch.stack([shift_xx, shift_yy, stride_xx, stride_yy], dim=-1)
return shifts
# Helper functions needed for geometric mean sigmoid
def sigmoid_geometric_mean(x, y):
"""Compute geometric mean of two sigmoid functions."""
x_sigmoid = torch.sigmoid(x)
y_sigmoid = torch.sigmoid(y)
return torch.sqrt(x_sigmoid * y_sigmoid)
def inverse_sigmoid(x, eps=1e-5):
"""Inverse function of sigmoid."""
x = x.clamp(min=0, max=1)
x1 = x.clamp(min=eps)
x2 = (1 - x).clamp(min=eps)
return torch.log(x1 / x2)
class RTMDetSepBNHead(nn.Module):
"""RTMDetHead with separated BN layers and shared conv layers."""
def __init__(
self,
num_classes: int,
in_channels: int,
share_conv: bool = True,
use_depthwise: bool = False,
pred_kernel_size: int = 1,
stacked_convs: int = 2,
feat_channels: int = 256,
strides: List[int] = [8, 16, 32],
with_objectness: bool = False,
exp_on_reg: bool = False,
) -> None:
super().__init__()
self.num_classes = num_classes
self.cls_out_channels = num_classes # For sigmoid
self.in_channels = in_channels
self.feat_channels = feat_channels
self.stacked_convs = stacked_convs
self.share_conv = share_conv
self.use_depthwise = use_depthwise
self.pred_kernel_size = pred_kernel_size
self.with_objectness = with_objectness
self.exp_on_reg = exp_on_reg
self.strides = strides
# Number of anchors per grid point
self.num_base_priors = 1
self._init_layers()
def _init_layers(self) -> None:
"""Initialize layers of the head."""
self.cls_convs = nn.ModuleList()
self.reg_convs = nn.ModuleList()
self.rtm_cls = nn.ModuleList()
self.rtm_reg = nn.ModuleList()
if self.with_objectness:
self.rtm_obj = nn.ModuleList()
for n in range(len(self.strides)):
cls_convs = nn.ModuleList()
reg_convs = nn.ModuleList()
for i in range(self.stacked_convs):
chn = self.in_channels if i == 0 else self.feat_channels
if self.use_depthwise:
cls_conv = DepthwiseSeparableConvModule(
chn,
self.feat_channels,
3,
stride=1,
padding=1,
bias=False,
act_cfg=dict(type='SiLU'),
norm_cfg=dict(type='BN', momentum=0.03, eps=0.001)
)
reg_conv = DepthwiseSeparableConvModule(
chn,
self.feat_channels,
3,
stride=1,
padding=1,
bias=False,
act_cfg=dict(type='SiLU'),
norm_cfg=dict(type='BN', momentum=0.03, eps=0.001)
)
else:
cls_conv = ConvModule(
chn,
self.feat_channels,
3,
stride=1,
padding=1,
bias=False,
act_cfg=dict(type='SiLU'),
norm_cfg=dict(type='BN', momentum=0.03, eps=0.001))
reg_conv = ConvModule(
chn,
self.feat_channels,
3,
stride=1,
padding=1,
bias=False,
act_cfg=dict(type='SiLU'),
norm_cfg=dict(type='BN', momentum=0.03, eps=0.001))
# Append conv layers to the list
cls_convs.append(cls_conv)
reg_convs.append(reg_conv)
self.cls_convs.append(cls_convs)
self.reg_convs.append(reg_convs)
self.rtm_cls.append(
nn.Conv2d(
self.feat_channels,
self.num_base_priors * self.cls_out_channels,
self.pred_kernel_size,
padding=self.pred_kernel_size // 2))
self.rtm_reg.append(
nn.Conv2d(
self.feat_channels,
self.num_base_priors * 4,
self.pred_kernel_size,
padding=self.pred_kernel_size // 2))
if self.with_objectness:
self.rtm_obj.append(
nn.Conv2d(
self.feat_channels,
1,
self.pred_kernel_size,
padding=self.pred_kernel_size // 2))
if self.share_conv:
for n in range(1, len(self.strides)):
for i in range(self.stacked_convs):
self.cls_convs[n][i] = self.cls_convs[0][i]
self.reg_convs[n][i] = self.reg_convs[0][i]
# Initialize MlvlPointGenerator for anchor-free detection
self.prior_generator = MlvlPointGenerator(self.strides, offset=0.0)
def init_weights(self):
"""Initialize weights of the head."""
# Initialize conv layers with normal distribution
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.normal_(m.weight, mean=0, std=0.01)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
if isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
# Initialize classification layers with a prior probability
bias_init = -torch.log(torch.tensor((1 - 0.01) / 0.01))
for rtm_cls in self.rtm_cls:
nn.init.normal_(rtm_cls.weight, mean=0, std=0.01)
nn.init.constant_(rtm_cls.bias, bias_init)
for rtm_reg in self.rtm_reg:
nn.init.normal_(rtm_reg.weight, mean=0, std=0.01)
nn.init.constant_(rtm_reg.bias, 0)
if self.with_objectness:
for rtm_obj in self.rtm_obj:
nn.init.normal_(rtm_obj.weight, mean=0, std=0.01)
nn.init.constant_(rtm_obj.bias, bias_init)
def forward(self, feats):
"""Forward features from the upstream network.
Args:
feats (tuple[Tensor]): Features from the upstream network, each is
a 4D-tensor.
Returns:
tuple: Usually a tuple of classification scores and bbox prediction
- cls_scores (list[Tensor]): Classification scores for all scale
levels, each is a 4D-tensor.
- bbox_preds (list[Tensor]): Box energies / deltas for all scale
levels, each is a 4D-tensor.
"""
cls_scores = []
bbox_preds = []
for idx, (x, stride) in enumerate(
zip(feats, self.strides)):
cls_feat = x
reg_feat = x
for cls_layer in self.cls_convs[idx]:
cls_feat = cls_layer(cls_feat)
cls_score = self.rtm_cls[idx](cls_feat)
for reg_layer in self.reg_convs[idx]:
reg_feat = reg_layer(reg_feat)
if self.with_objectness:
objectness = self.rtm_obj[idx](reg_feat)
cls_score = inverse_sigmoid(
sigmoid_geometric_mean(cls_score, objectness))
if self.exp_on_reg:
# Convert anchor-free to distance prediction, with stride scale
reg_dist = self.rtm_reg[idx](reg_feat).exp() * stride
else:
reg_dist = self.rtm_reg[idx](reg_feat) * stride
cls_scores.append(cls_score)
bbox_preds.append(reg_dist)
return tuple(cls_scores), tuple(bbox_preds)
def predict(self, cls_scores, bbox_preds, batch_img_metas=None, cfg=None,
rescale=False, with_nms=True, score_thr=0.05,
nms_iou_threshold=0.6, max_per_img=100):
"""Transform network outputs into bbox predictions.
This is a simplified version for inference only.
"""
assert len(cls_scores) == len(bbox_preds)
num_levels = len(cls_scores)
device = cls_scores[0].device
batch_size = cls_scores[0].shape[0]
# If no image metadata is provided, create default ones
if batch_img_metas is None:
# Use input feature size to estimate image size
featmap_sizes = [cls_scores[i].shape[-2:] for i in range(num_levels)]
strides = self.strides
# Calculate original image size based on feature map sizes and strides
# This is approximate but works for most cases
upscaled_sizes = []
for i, featmap_size in enumerate(featmap_sizes):
h, w = featmap_size
upscaled_sizes.append((h * strides[i], w * strides[i]))
# Use the maximum size across levels
img_h = max(s[0] for s in upscaled_sizes)
img_w = max(s[1] for s in upscaled_sizes)
batch_img_metas = [{
'img_shape': (img_h, img_w, 3),
'scale_factor': [1.0, 1.0, 1.0, 1.0]
} for _ in range(batch_size)]
# Get feature map sizes
featmap_sizes = [cls_scores[i].shape[-2:] for i in range(num_levels)]
# Generate grid points for each level
mlvl_priors = self.prior_generator.grid_priors(
featmap_sizes,
dtype=cls_scores[0].dtype,
device=device,
with_stride=True)
result_list = []
for img_id in range(batch_size):
img_meta = batch_img_metas[img_id]
cls_score_list = [
cls_scores[i][img_id].detach() for i in range(num_levels)
]
bbox_pred_list = [
bbox_preds[i][img_id].detach() for i in range(num_levels)
]
results = self._predict_by_feat_single(
cls_score_list,
bbox_pred_list,
mlvl_priors,
img_meta,
score_thr=score_thr,
nms_iou_threshold=nms_iou_threshold,
max_per_img=max_per_img,
rescale=rescale,
with_nms=with_nms
)
result_list.append(results)
# Convert the results to a more standardized format
boxes_batch = []
scores_batch = []
labels_batch = []
for result in result_list:
boxes = result['bboxes']
scores = result.get('scores', boxes[:, -1])
labels = result['labels']
# Ensure boxes have only coordinates (some implementations add score as 5th column)
if boxes.shape[1] > 4:
boxes = boxes[:, :4]
boxes_batch.append(boxes)
scores_batch.append(scores)
labels_batch.append(labels)
# Stack results if there's at least one detection in each image
if all(len(boxes) > 0 for boxes in boxes_batch):
return DetectionOutput(
boxes=torch.stack(boxes_batch),
scores=torch.stack(scores_batch),
labels=torch.stack(labels_batch)
)
# Handle case where some images have no detections
max_num = max(len(boxes) for boxes in boxes_batch)
if max_num == 0:
# No detections at all
dummy = torch.zeros((batch_size, 0, 4), device=device)
return DetectionOutput(
boxes=dummy,
scores=torch.zeros((batch_size, 0), device=device),
labels=torch.zeros((batch_size, 0), dtype=torch.long, device=device)
)
# Pad results to have consistent tensor shapes
padded_boxes = []
padded_scores = []
padded_labels = []
for boxes, scores, labels in zip(boxes_batch, scores_batch, labels_batch):
num_dets = len(boxes)
if num_dets == 0:
padded_boxes.append(torch.zeros((max_num, 4), device=device))
padded_scores.append(torch.zeros(max_num, device=device))
padded_labels.append(torch.zeros(max_num, dtype=torch.long, device=device))
else:
padding = torch.zeros((max_num - num_dets, 4), device=device)
padded_boxes.append(torch.cat([boxes, padding], dim=0))
padding = torch.zeros(max_num - num_dets, device=device)
padded_scores.append(torch.cat([scores, padding], dim=0))
padding = torch.zeros(max_num - num_dets, dtype=torch.long, device=device)
padded_labels.append(torch.cat([labels, padding], dim=0))
return DetectionOutput(
boxes=torch.stack(padded_boxes),
scores=torch.stack(padded_scores),
labels=torch.stack(padded_labels)
)
def _predict_by_feat_single(self, cls_score_list, bbox_pred_list, mlvl_priors,
img_meta, score_thr=0.05, nms_iou_threshold=0.6,
max_per_img=100, rescale=False, with_nms=True):
"""Transform outputs of a single image into bbox predictions.
This is a simplified version for inference only.
"""
# For each scale level
mlvl_bboxes = []
mlvl_scores = []
for level_idx, (cls_score, bbox_pred, priors) in enumerate(
zip(cls_score_list, bbox_pred_list, mlvl_priors)):
assert cls_score.size()[-2:] == bbox_pred.size()[-2:]
# Reshape
cls_score = cls_score.permute(1, 2, 0).reshape(-1, self.cls_out_channels)
bbox_pred = bbox_pred.permute(1, 2, 0).reshape(-1, 4)
# Get scores
scores = torch.sigmoid(cls_score)
# Find high-scoring predictions
max_scores, _ = scores.max(dim=1)
keep_mask = max_scores > score_thr
scores = scores[keep_mask]
bbox_pred = bbox_pred[keep_mask]
priors = priors[keep_mask]
# If no valid predictions for this level, continue
if scores.numel() == 0:
continue
# Decode bboxes
bboxes = self._decode_bboxes(priors, bbox_pred, img_meta.get('img_shape'))
mlvl_bboxes.append(bboxes)
mlvl_scores.append(scores)
# Combine all levels
if len(mlvl_bboxes) == 0:
# Return empty result if no valid predictions
return {
'bboxes': torch.zeros((0, 4), device=cls_score_list[0].device),
'scores': torch.zeros((0,), device=cls_score_list[0].device),
'labels': torch.zeros((0,), device=cls_score_list[0].device, dtype=torch.long)
}
bboxes = torch.cat(mlvl_bboxes)
scores = torch.cat(mlvl_scores)
# Optional rescaling to original image size
if rescale and 'scale_factor' in img_meta:
bboxes /= bboxes.new_tensor(img_meta['scale_factor']).repeat((1, 2))
# Apply NMS for each class
if with_nms:
det_bboxes, det_labels = self._nms(bboxes, scores,
nms_iou_threshold,
max_per_img)
else:
# Just return top k scores without NMS
scores_flattened = scores.flatten()
if scores_flattened.size(0) > max_per_img:
top_scores, indices = scores_flattened.topk(max_per_img)
scores_top_k = scores.view(-1, self.num_classes).index_select(0, indices)
bboxes_top_k = bboxes.index_select(0, indices)
labels_top_k = indices % self.num_classes
det_bboxes = torch.cat([bboxes_top_k, top_scores.unsqueeze(-1)], dim=1)
det_labels = labels_top_k
else:
# Convert to the same format with NMS
num_bboxes = bboxes.size(0)
max_scores, labels = scores.max(dim=1)
det_bboxes = torch.cat([bboxes, max_scores.unsqueeze(-1)], dim=1)
det_labels = labels
return {
'bboxes': det_bboxes,
'scores': det_bboxes[:, -1],
'labels': det_labels
}
def _decode_bboxes(self, priors, distance, max_shape=None):
"""Decode distance predictions to bounding box coordinates."""
# Get xy coordinates of priors (grid points)
xy = priors[..., :2]
# Distance predictions to 4 boundaries (left, top, right, bottom)
# distances = [l, t, r, b]
# Calculate bbox coordinates
x1 = xy[..., 0] - distance[..., 0]
y1 = xy[..., 1] - distance[..., 1]
x2 = xy[..., 0] + distance[..., 2]
y2 = xy[..., 1] + distance[..., 3]
bboxes = torch.stack([x1, y1, x2, y2], -1)
# Clip boxes to image boundaries if needed
if max_shape is not None:
bboxes[..., 0].clamp_(min=0, max=max_shape[1])
bboxes[..., 1].clamp_(min=0, max=max_shape[0])
bboxes[..., 2].clamp_(min=0, max=max_shape[1])
bboxes[..., 3].clamp_(min=0, max=max_shape[0])
return bboxes
def _nms(self, bboxes, scores, iou_threshold, max_per_img):
"""Apply NMS to detection results."""
# For each class
num_classes = scores.shape[1]
det_bboxes = []
det_labels = []
for cls_idx in range(num_classes):
cls_scores = scores[:, cls_idx]
keep_idx = cls_scores > 0.05 # Apply score threshold
if not keep_idx.any():
continue
cls_bboxes = bboxes[keep_idx]
cls_scores = cls_scores[keep_idx]
# Apply NMS for this class
keep = self._batched_nms(cls_bboxes, cls_scores, iou_threshold)
keep = keep[:max_per_img]
det_bboxes.append(torch.cat([cls_bboxes[keep], cls_scores[keep].unsqueeze(-1)], dim=1))
det_labels.append(cls_bboxes.new_full((keep.size(0),), cls_idx, dtype=torch.long))
if len(det_bboxes) > 0:
det_bboxes = torch.cat(det_bboxes, dim=0)
det_labels = torch.cat(det_labels, dim=0)
# Sort by score
_, indices = det_bboxes[:, -1].sort(descending=True)
det_bboxes = det_bboxes[indices]
det_labels = det_labels[indices]
# Limit to max_per_img
det_bboxes = det_bboxes[:max_per_img]
det_labels = det_labels[:max_per_img]
else:
# Return empty tensors if no detections
det_bboxes = bboxes.new_zeros((0, 5))
det_labels = bboxes.new_zeros((0,), dtype=torch.long)
return det_bboxes, det_labels
def _batched_nms(self, boxes, scores, iou_threshold):
"""Performs non-maximum suppression on a batch of boxes."""
if boxes.shape[0] == 0:
return boxes.new_zeros(0, dtype=torch.long)
try:
# Try to use torchvision NMS for speed if available
return torchvision.ops.nms(boxes, scores, iou_threshold)
except:
# Fall back to manual NMS implementation
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
areas = (x2 - x1) * (y2 - y1)
_, order = scores.sort(descending=True)
keep = []
while order.size(0) > 0:
i = order[0].item()
keep.append(i)
if order.size(0) == 1:
break
xx1 = torch.max(x1[order[1:]], x1[i])
yy1 = torch.max(y1[order[1:]], y1[i])
xx2 = torch.min(x2[order[1:]], x2[i])
yy2 = torch.min(y2[order[1:]], y2[i])
w = torch.clamp(xx2 - xx1, min=0)
h = torch.clamp(yy2 - yy1, min=0)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter)
inds = torch.where(iou <= iou_threshold)[0]
order = order[inds + 1]
return torch.tensor(keep, dtype=torch.long, device=boxes.device)
class RTMDetModel(PreTrainedModel):
"""
RTMDet object detection model compatible with Hugging Face transformers.
Updated implementation using PyTorch only with no NumPy or OpenCV dependencies.
This model consists of a backbone, neck, and detection head for object detection.
"""
config_class = RTMDetConfig
base_model_prefix = "rtmdet"
main_input_name = "pixel_values"
# share_conv aliases BN modules across strides; override tied-weights
# method so transformers never calls get_parameter() on BN buffers.
_tied_weights_keys = None
def mark_tied_weights_as_initialized(self):
pass # share_conv makes BN buffers look tied — skip to avoid AttributeError
def __init__(self, config):
super().__init__(config)
# Build backbone
self.backbone = CSPNeXt(
arch=config.backbone_arch,
deepen_factor=config.backbone_deepen_factor,
widen_factor=config.backbone_widen_factor,
expand_ratio=config.backbone_expand_ratio,
channel_attention=config.backbone_channel_attention,
use_depthwise=False,
)
# Build neck
self.neck = CSPNeXtPAFPN(
in_channels=config.neck_in_channels,
out_channels=config.neck_out_channels,
num_csp_blocks=config.neck_num_csp_blocks,
expand_ratio=config.neck_expand_ratio,
use_depthwise=False,
)
# Build head
self.bbox_head = RTMDetSepBNHead(
num_classes=config.num_classes,
in_channels=config.head_in_channels,
stacked_convs=config.head_stacked_convs,
feat_channels=config.head_feat_channels,
with_objectness=config.head_with_objectness,
exp_on_reg=config.head_exp_on_reg,
share_conv=config.head_share_conv,
pred_kernel_size=config.head_pred_kernel_size,
strides=config.strides,
use_depthwise=False
)
# Initialize weights
self.init_weights()
# Required: triggers post_init() which sets all_tied_weights_keys etc.
self.post_init()
def init_weights(self):
"""Initialize the weights of the model."""
# Backbone is usually initialized from pre-trained weights
# so we don't need special initialization
# Initialize head
self.bbox_head.init_weights()
def forward(
self,
pixel_values=None,
original_size=None,
labels=None,
output_hidden_states=None,
return_dict=None,
):
"""
Forward pass of the model.
Args:
pixel_values (`torch.FloatTensor` of shape `(batch_size, channels, height, width)`):
Pixel values resized to 640×640 by the image processor.
original_size (`Tuple[int, int]`, *optional*):
``(height, width)`` of the **original** image before preprocessing.
When supplied, the returned boxes are automatically scaled from
640×640 model-input space to original image pixel coordinates so
the caller never needs to compute ``sx = orig_w / 640`` manually.
All images in the batch are assumed to share the same original size.
labels (`List[Dict]`, *optional*):
Labels for computing the detection loss.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers.
return_dict (`bool`, *optional*):
Whether or not to return a ModelOutput instead of a plain tuple.
Returns:
`DetectionOutput` or `tuple`:
Boxes are in 640×640 space by default, or in original image space
when ``original_size`` is provided.
"""
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# Get inputs
if pixel_values is None:
raise ValueError("You have to specify pixel_values")
batch_size, channels, height, width = pixel_values.shape
# Extract features from backbone
backbone_features = self.backbone(pixel_values)
# Process features through neck
neck_features = self.neck(backbone_features)
# Get cls_scores and bbox_preds from head
cls_scores, bbox_preds = self.bbox_head(neck_features)
if labels is not None:
# Training mode: calculate loss (not implemented in this simplified version)
loss = torch.tensor(0.0, device=pixel_values.device)
if return_dict:
return DetectionOutput(loss=loss)
else:
return (loss,)
# Inference mode: Get detection results
# Create default batch_img_metas for prediction
batch_img_metas = [{
'img_shape': (height, width, 3),
'scale_factor': [1.0, 1.0, 1.0, 1.0]
} for _ in range(batch_size)]
# Call predict method with parameters from config
results = self.bbox_head.predict(
cls_scores=cls_scores,
bbox_preds=bbox_preds,
batch_img_metas=batch_img_metas,
rescale=False,
with_nms=True,
score_thr=self.config.score_threshold,
nms_iou_threshold=self.config.nms_threshold,
max_per_img=self.config.max_detections
)
# Scale boxes from 640×640 model space → original image space if requested
if original_size is not None:
orig_h, orig_w = original_size
sx = orig_w / width # width == 640
sy = orig_h / height # height == 640
scaled_boxes = results.boxes.clone()
scaled_boxes[..., 0] *= sx # x1
scaled_boxes[..., 2] *= sx # x2
scaled_boxes[..., 1] *= sy # y1
scaled_boxes[..., 3] *= sy # y2
results = DetectionOutput(
boxes=scaled_boxes,
scores=results.scores,
labels=results.labels,
)
if return_dict:
return results
else:
# Return as tuple (boxes, scores, labels)
return (results.boxes, results.scores, results.labels)