from typing import List, Optional, Tuple, Union, Sequence, Dict from dataclasses import dataclass import inspect from functools import partial import warnings import math import torch import torchvision import torch.nn as nn from torch import Tensor import torch.nn.functional as F from torch.nn.modules.batchnorm import _BatchNorm, SyncBatchNorm from transformers.modeling_outputs import ModelOutput from transformers.modeling_utils import PreTrainedModel from transformers.utils import logging from .configuration_rtmdet import RTMDetConfig logger = logging.get_logger(__name__) @dataclass class DetectionOutput(ModelOutput): """ Output type for object detection models. Args: boxes (`torch.FloatTensor` of shape `(batch_size, num_boxes, 4)`): Detection boxes in format [x1, y1, x2, y2]. Coordinates are in model-input space (640×640) by default, or in original image pixel space when ``original_size`` was passed to ``forward()``. scores (`torch.FloatTensor` of shape `(batch_size, num_boxes)`): Detection confidence scores. labels (`torch.LongTensor` of shape `(batch_size, num_boxes)`): Detection class indices. loss (`torch.FloatTensor`, *optional*): Loss value if training. """ boxes: torch.FloatTensor = None scores: torch.FloatTensor = None labels: torch.LongTensor = None loss: Optional[torch.FloatTensor] = None # Replace MODELS registry with direct class mappings ACTIVATION_LAYERS = { 'ReLU': nn.ReLU, 'LeakyReLU': nn.LeakyReLU, 'PReLU': nn.PReLU, 'SiLU': nn.SiLU, 'Sigmoid': nn.Sigmoid, 'Tanh': nn.Tanh, 'GELU': nn.GELU, 'Swish': nn.SiLU, # Swish is equivalent to SiLU 'Hardsigmoid': nn.Hardsigmoid, 'HSigmoid': nn.Hardsigmoid } # Simple Config Type replacement ConfigType = Dict OptConfigType = Optional[Dict] OptMultiConfig = Optional[Union[Dict, List[Dict]]] def build_activation_layer(cfg: Dict) -> nn.Module: """Build activation layer. Args: cfg (dict): The activation layer config, which should contain: - type (str): Layer type. - layer args: Args needed to instantiate an activation layer. Returns: nn.Module: Created activation layer. """ if not isinstance(cfg, dict): raise TypeError('cfg must be a dict') if 'type' not in cfg: raise KeyError('the cfg dict must contain the key "type"') cfg_ = cfg.copy() layer_type = cfg_.pop('type') if layer_type not in ACTIVATION_LAYERS: raise KeyError(f'Unrecognized activation type {layer_type}') activation = ACTIVATION_LAYERS[layer_type] return activation(**cfg_) def kaiming_init(module, a=0, mode='fan_out', nonlinearity='relu', bias=0, distribution='normal'): assert distribution in ['uniform', 'normal'] if hasattr(module, 'weight') and module.weight is not None: if distribution == 'uniform': nn.init.kaiming_uniform_( module.weight, a=a, mode=mode, nonlinearity=nonlinearity) else: nn.init.kaiming_normal_( module.weight, a=a, mode=mode, nonlinearity=nonlinearity) if hasattr(module, 'bias') and module.bias is not None: nn.init.constant_(module.bias, bias) def constant_init(module, val, bias=0): if hasattr(module, 'weight') and module.weight is not None: nn.init.constant_(module.weight, val) if hasattr(module, 'bias') and module.bias is not None: nn.init.constant_(module.bias, bias) class _InstanceNorm(nn.modules.instancenorm._InstanceNorm): """Instance Normalization Base Class.""" pass # Custom implementation of methods with asterisks that couldn't be included in the original code # These methods need to be renamed without asterisks in actual implementation def infer_abbr(class_type): """Infer abbreviation from the class name.""" if not inspect.isclass(class_type): raise TypeError( f'class_type must be a type, but got {type(class_type)}') if hasattr(class_type, '_abbr_'): return class_type._abbr_ if issubclass(class_type, _InstanceNorm): # IN is a subclass of BN return 'in' elif issubclass(class_type, _BatchNorm): return 'bn' elif issubclass(class_type, nn.GroupNorm): return 'gn' elif issubclass(class_type, nn.LayerNorm): return 'ln' else: class_name = class_type.__name__.lower() if 'batch' in class_name: return 'bn' elif 'group' in class_name: return 'gn' elif 'layer' in class_name: return 'ln' elif 'instance' in class_name: return 'in' else: return 'norm_layer' # Create mapping from strings to layer classes NORM_LAYERS = { 'BN': nn.BatchNorm2d, 'BN1d': nn.BatchNorm1d, 'BN2d': nn.BatchNorm2d, 'BN3d': nn.BatchNorm3d, 'SyncBN': SyncBatchNorm, 'GN': nn.GroupNorm, 'LN': nn.LayerNorm, 'IN': nn.InstanceNorm2d, 'IN1d': nn.InstanceNorm1d, 'IN2d': nn.InstanceNorm2d, 'IN3d': nn.InstanceNorm3d } CONV_LAYERS = { 'Conv1d': nn.Conv1d, 'Conv2d': nn.Conv2d, 'Conv3d': nn.Conv3d, 'Conv': nn.Conv2d } PADDING_LAYERS = { 'zero': nn.ZeroPad2d, 'reflect': nn.ReflectionPad2d, 'replicate': nn.ReplicationPad2d } def build_norm_layer(cfg: Dict, num_features: int, postfix: Union[int, str] = '') -> Tuple[str, nn.Module]: """Build normalization layer.""" if not isinstance(cfg, dict): raise TypeError('cfg must be a dict') if 'type' not in cfg: raise KeyError('the cfg dict must contain the key "type"') cfg_ = cfg.copy() layer_type = cfg_.pop('type') if layer_type not in NORM_LAYERS: raise KeyError(f'Unrecognized norm type {layer_type}') norm_layer = NORM_LAYERS[layer_type] abbr = infer_abbr(norm_layer) assert isinstance(postfix, (int, str)) name = abbr + str(postfix) requires_grad = cfg_.pop('requires_grad', True) cfg_.setdefault('eps', 1e-5) if norm_layer is not nn.GroupNorm: layer = norm_layer(num_features, **cfg_) if layer_type == 'SyncBN' and hasattr(layer, '_specify_ddp_gpu_num'): layer._specify_ddp_gpu_num(1) else: assert 'num_groups' in cfg_ layer = norm_layer(num_channels=num_features, **cfg_) for param in layer.parameters(): param.requires_grad = requires_grad return name, layer def build_conv_layer(cfg: Optional[Dict], *args, **kwargs) -> nn.Module: """Build convolution layer.""" if cfg is None: cfg_ = dict(type='Conv2d') else: if not isinstance(cfg, dict): raise TypeError('cfg must be a dict') if 'type' not in cfg: raise KeyError('the cfg dict must contain the key "type"') cfg_ = cfg.copy() layer_type = cfg_.pop('type') if layer_type not in CONV_LAYERS: raise KeyError(f'Unrecognized conv type {layer_type}') conv_layer = CONV_LAYERS[layer_type] layer = conv_layer(*args, **kwargs, **cfg_) return layer def build_padding_layer(cfg: Dict, *args, **kwargs) -> nn.Module: """Build padding layer.""" if not isinstance(cfg, dict): raise TypeError('cfg must be a dict') if 'type' not in cfg: raise KeyError('the cfg dict must contain the key "type"') cfg_ = cfg.copy() padding_type = cfg_.pop('type') if padding_type not in PADDING_LAYERS: raise KeyError(f'Unrecognized padding type {padding_type}') padding_layer = PADDING_LAYERS[padding_type] layer = padding_layer(*args, **kwargs, **cfg_) return layer def efficient_conv_bn_eval_forward(bn: _BatchNorm, conv: nn.modules.conv._ConvNd, x: torch.Tensor): """ Implementation based on https://arxiv.org/abs/2305.11624 "Tune-Mode ConvBN Blocks For Efficient Transfer Learning" It leverages the associative law between convolution and affine transform, i.e., normalize (weight conv feature) = (normalize weight) conv feature. It works for Eval mode of ConvBN blocks during validation, and can be used for training as well. It reduces memory and computation cost. Args: bn (_BatchNorm): a BatchNorm module. conv (nn._ConvNd): a conv module x (torch.Tensor): Input feature map. """ # These lines of code are designed to deal with various cases # like bn without affine transform, and conv without bias weight_on_the_fly = conv.weight if conv.bias is not None: bias_on_the_fly = conv.bias else: bias_on_the_fly = torch.zeros_like(bn.running_var) if bn.weight is not None: bn_weight = bn.weight else: bn_weight = torch.ones_like(bn.running_var) if bn.bias is not None: bn_bias = bn.bias else: bn_bias = torch.zeros_like(bn.running_var) # shape of [C_out, 1, 1, 1] in Conv2d weight_coeff = torch.rsqrt(bn.running_var + bn.eps).reshape([-1] + [1] * (len(conv.weight.shape) - 1)) # shape of [C_out, 1, 1, 1] in Conv2d coefff_on_the_fly = bn_weight.view_as(weight_coeff) * weight_coeff # shape of [C_out, C_in, k, k] in Conv2d weight_on_the_fly = weight_on_the_fly * coefff_on_the_fly # shape of [C_out] in Conv2d bias_on_the_fly = bn_bias + coefff_on_the_fly.flatten() *\ (bias_on_the_fly - bn.running_mean) return conv._conv_forward(x, weight_on_the_fly, bias_on_the_fly) class ConvModule(nn.Module): """A conv block that bundles conv/norm/activation layers.""" _abbr_ = 'conv_block' def __init__(self, in_channels: int, out_channels: int, kernel_size: Union[int, Tuple[int, int]], stride: Union[int, Tuple[int, int]] = 1, padding: Union[int, Tuple[int, int]] = 0, dilation: Union[int, Tuple[int, int]] = 1, groups: int = 1, bias: Union[bool, str] = 'auto', conv_cfg: Optional[Dict] = None, norm_cfg: Optional[Dict] = None, act_cfg: Optional[Dict] = dict(type='ReLU'), inplace: bool = True, with_spectral_norm: bool = False, padding_mode: str = 'zeros', order: tuple = ('conv', 'norm', 'act'), efficient_conv_bn_eval: bool = False): super().__init__() assert conv_cfg is None or isinstance(conv_cfg, dict) assert norm_cfg is None or isinstance(norm_cfg, dict) assert act_cfg is None or isinstance(act_cfg, dict) official_padding_mode = ['zeros', 'circular'] self.conv_cfg = conv_cfg self.norm_cfg = norm_cfg self.act_cfg = act_cfg self.inplace = inplace self.with_spectral_norm = with_spectral_norm self.with_explicit_padding = padding_mode not in official_padding_mode self.order = order assert isinstance(self.order, tuple) and len(self.order) == 3 assert set(order) == {'conv', 'norm', 'act'} self.with_norm = norm_cfg is not None self.with_activation = act_cfg is not None # if the conv layer is before a norm layer, bias is unnecessary. if bias == 'auto': bias = not self.with_norm self.with_bias = bias if self.with_explicit_padding: pad_cfg = dict(type=padding_mode) self.padding_layer = build_padding_layer(pad_cfg, padding) # reset padding to 0 for conv module conv_padding = 0 if self.with_explicit_padding else padding # build convolution layer self.conv = build_conv_layer( conv_cfg, in_channels, out_channels, kernel_size, stride=stride, padding=conv_padding, dilation=dilation, groups=groups, bias=bias) # export the attributes of self.conv to a higher level for convenience self.in_channels = self.conv.in_channels self.out_channels = self.conv.out_channels self.kernel_size = self.conv.kernel_size self.stride = self.conv.stride self.padding = padding self.dilation = self.conv.dilation self.transposed = self.conv.transposed self.output_padding = self.conv.output_padding self.groups = self.conv.groups if self.with_spectral_norm: self.conv = nn.utils.spectral_norm(self.conv) # build normalization layers if self.with_norm: # norm layer is after conv layer if order.index('norm') > order.index('conv'): norm_channels = out_channels else: norm_channels = in_channels self.norm_name, norm = build_norm_layer( norm_cfg, norm_channels) # type: ignore self.add_module(self.norm_name, norm) if self.with_bias: if isinstance(norm, (_BatchNorm, _InstanceNorm)): warnings.warn( 'Unnecessary conv bias before batch/instance norm') else: self.norm_name = None # type: ignore self.turn_on_efficient_conv_bn_eval(efficient_conv_bn_eval) # build activation layer if self.with_activation: act_cfg_ = act_cfg.copy() # type: ignore # nn.Tanh has no 'inplace' argument if act_cfg_['type'] not in [ 'Tanh', 'PReLU', 'Sigmoid', 'HSigmoid', 'Swish', 'GELU' ]: act_cfg_.setdefault('inplace', inplace) self.activate = build_activation_layer(act_cfg_) # Use msra init by default self.init_weights() @property def norm(self): if self.norm_name: return getattr(self, self.norm_name) else: return None def init_weights(self): if not hasattr(self.conv, 'init_weights'): if self.with_activation and self.act_cfg['type'] == 'LeakyReLU': nonlinearity = 'leaky_relu' a = self.act_cfg.get('negative_slope', 0.01) else: nonlinearity = 'relu' a = 0 kaiming_init(self.conv, a=a, nonlinearity=nonlinearity) if self.with_norm: constant_init(self.norm, 1, bias=0) def forward(self, x: torch.Tensor, activate: bool = True, norm: bool = True) -> torch.Tensor: layer_index = 0 while layer_index < len(self.order): layer = self.order[layer_index] if layer == 'conv': if self.with_explicit_padding: x = self.padding_layer(x) # if the next operation is norm and we have a norm layer in # eval mode and we have enabled `efficient_conv_bn_eval` for # the conv operator, then activate the optimized forward and # skip the next norm operator since it has been fused if layer_index + 1 < len(self.order) and \ self.order[layer_index + 1] == 'norm' and norm and \ self.with_norm and not self.norm.training and \ self.efficient_conv_bn_eval_forward is not None: self.conv.forward = partial( self.efficient_conv_bn_eval_forward, self.norm, self.conv) layer_index += 1 x = self.conv(x) del self.conv.forward else: x = self.conv(x) elif layer == 'norm' and norm and self.with_norm: x = self.norm(x) elif layer == 'act' and activate and self.with_activation: x = self.activate(x) layer_index += 1 return x def turn_on_efficient_conv_bn_eval(self, efficient_conv_bn_eval=True): # efficient_conv_bn_eval works for conv + bn # with `track_running_stats` option if efficient_conv_bn_eval and self.norm \ and isinstance(self.norm, _BatchNorm) \ and self.norm.track_running_stats: self.efficient_conv_bn_eval_forward = efficient_conv_bn_eval_forward # noqa: E501 else: self.efficient_conv_bn_eval_forward = None # type: ignore @staticmethod def create_from_conv_bn(conv: torch.nn.modules.conv._ConvNd, bn: torch.nn.modules.batchnorm._BatchNorm, efficient_conv_bn_eval=True) -> 'ConvModule': """Create a ConvModule from a conv and a bn module.""" self = ConvModule.__new__(ConvModule) super(ConvModule, self).__init__() self.conv_cfg = None self.norm_cfg = None self.act_cfg = None self.inplace = False self.with_spectral_norm = False self.with_explicit_padding = False self.order = ('conv', 'norm', 'act') self.with_norm = True self.with_activation = False self.with_bias = conv.bias is not None # build convolution layer self.conv = conv # export the attributes of self.conv to a higher level for convenience self.in_channels = self.conv.in_channels self.out_channels = self.conv.out_channels self.kernel_size = self.conv.kernel_size self.stride = self.conv.stride self.padding = self.conv.padding self.dilation = self.conv.dilation self.transposed = self.conv.transposed self.output_padding = self.conv.output_padding self.groups = self.conv.groups # build normalization layers self.norm_name, norm = 'bn', bn self.add_module(self.norm_name, norm) self.turn_on_efficient_conv_bn_eval(efficient_conv_bn_eval) return self class DepthwiseSeparableConvModule(nn.Module): """Depthwise separable convolution module.""" def __init__(self, in_channels: int, out_channels: int, kernel_size: Union[int, Tuple[int, int]], stride: Union[int, Tuple[int, int]] = 1, padding: Union[int, Tuple[int, int]] = 0, dilation: Union[int, Tuple[int, int]] = 1, norm_cfg: Optional[Dict] = None, act_cfg: Dict = dict(type='ReLU'), dw_norm_cfg: Union[Dict, str] = 'default', dw_act_cfg: Union[Dict, str] = 'default', pw_norm_cfg: Union[Dict, str] = 'default', pw_act_cfg: Union[Dict, str] = 'default', **kwargs): super().__init__() assert 'groups' not in kwargs, 'groups should not be specified' # if norm/activation config of depthwise/pointwise ConvModule is not # specified, use default config. dw_norm_cfg = dw_norm_cfg if dw_norm_cfg != 'default' else norm_cfg # type: ignore # noqa E501 dw_act_cfg = dw_act_cfg if dw_act_cfg != 'default' else act_cfg pw_norm_cfg = pw_norm_cfg if pw_norm_cfg != 'default' else norm_cfg # type: ignore # noqa E501 pw_act_cfg = pw_act_cfg if pw_act_cfg != 'default' else act_cfg # depthwise convolution self.depthwise_conv = ConvModule( in_channels, in_channels, kernel_size, stride=stride, padding=padding, dilation=dilation, groups=in_channels, norm_cfg=dw_norm_cfg, # type: ignore act_cfg=dw_act_cfg, # type: ignore **kwargs) self.pointwise_conv = ConvModule( in_channels, out_channels, 1, norm_cfg=pw_norm_cfg, # type: ignore act_cfg=pw_act_cfg, # type: ignore **kwargs) def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.depthwise_conv(x) x = self.pointwise_conv(x) return x class SPPBottleneck(nn.Module): """Spatial pyramid pooling layer used in YOLOv3-SPP.""" def __init__(self, in_channels, out_channels, kernel_sizes=(5, 9, 13), conv_cfg=None, norm_cfg=dict(type='BN', momentum=0.03, eps=0.001), act_cfg=dict(type='Swish'), init_cfg=None): super().__init__() mid_channels = in_channels // 2 self.conv1 = ConvModule( in_channels, mid_channels, 1, stride=1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) self.poolings = nn.ModuleList([ nn.MaxPool2d(kernel_size=ks, stride=1, padding=ks // 2) for ks in kernel_sizes ]) conv2_channels = mid_channels * (len(kernel_sizes) + 1) self.conv2 = ConvModule( conv2_channels, out_channels, 1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) def forward(self, x): x = self.conv1(x) with torch.amp.autocast(enabled=False, device_type=x.device.type): x = torch.cat( [x] + [pooling(x) for pooling in self.poolings], dim=1) x = self.conv2(x) return x class DarknetBottleneck(nn.Module): """The basic bottleneck block used in Darknet.""" def __init__(self, in_channels: int, out_channels: int, expansion: float = 0.5, add_identity: bool = True, use_depthwise: bool = False, conv_cfg: OptConfigType = None, norm_cfg: ConfigType = dict( type='BN', momentum=0.03, eps=0.001), act_cfg: ConfigType = dict(type='Swish'), init_cfg: OptMultiConfig = None) -> None: super().__init__() hidden_channels = int(out_channels * expansion) conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule self.conv1 = ConvModule( in_channels, hidden_channels, 1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) self.conv2 = conv( hidden_channels, out_channels, 3, stride=1, padding=1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) self.add_identity = \ add_identity and in_channels == out_channels def forward(self, x: Tensor) -> Tensor: """Forward function.""" identity = x out = self.conv1(x) out = self.conv2(out) if self.add_identity: return out + identity else: return out class CSPNeXtBlock(nn.Module): """The basic bottleneck block used in CSPNeXt.""" def __init__(self, in_channels: int, out_channels: int, expansion: float = 0.5, add_identity: bool = True, use_depthwise: bool = False, kernel_size: int = 5, conv_cfg: OptConfigType = None, norm_cfg: ConfigType = dict( type='BN', momentum=0.03, eps=0.001), act_cfg: ConfigType = dict(type='SiLU'), init_cfg: OptMultiConfig = None) -> None: super().__init__() hidden_channels = int(out_channels * expansion) conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule self.conv1 = conv( in_channels, hidden_channels, 3, stride=1, padding=1, norm_cfg=norm_cfg, act_cfg=act_cfg) self.conv2 = DepthwiseSeparableConvModule( hidden_channels, out_channels, kernel_size, stride=1, padding=kernel_size // 2, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) self.add_identity = \ add_identity and in_channels == out_channels def forward(self, x: Tensor) -> Tensor: """Forward function.""" identity = x out = self.conv1(x) out = self.conv2(out) if self.add_identity: return out + identity else: return out class ChannelAttention(nn.Module): """Channel attention Module.""" def __init__(self, channels: int, init_cfg: OptMultiConfig = None) -> None: super().__init__() self.global_avgpool = nn.AdaptiveAvgPool2d(1) self.fc = nn.Conv2d(channels, channels, 1, 1, 0, bias=True) self.act = nn.Hardsigmoid(inplace=True) def forward(self, x: Tensor) -> Tensor: """Forward function for ChannelAttention.""" with torch.amp.autocast(enabled=False, device_type=x.device.type): out = self.global_avgpool(x) out = self.fc(out) out = self.act(out) return x * out class CSPLayer(nn.Module): """Cross Stage Partial Layer. Args: in_channels (int): The input channels of the CSP layer. out_channels (int): The output channels of the CSP layer. expand_ratio (float): Ratio to adjust the number of channels of the hidden layer. Defaults to 0.5. num_blocks (int): Number of blocks. Defaults to 1. add_identity (bool): Whether to add identity in blocks. Defaults to True. use_cspnext_block (bool): Whether to use CSPNeXt block. Defaults to False. use_depthwise (bool): Whether to use depthwise separable convolution in blocks. Defaults to False. channel_attention (bool): Whether to add channel attention in each stage. Defaults to True. conv_cfg (dict, optional): Config dict for convolution layer. Defaults to None, which means using conv2d. norm_cfg (dict): Config dict for normalization layer. Defaults to dict(type='BN') act_cfg (dict): Config dict for activation layer. Defaults to dict(type='Swish') """ def __init__(self, in_channels: int, out_channels: int, expand_ratio: float = 0.5, num_blocks: int = 1, add_identity: bool = True, use_depthwise: bool = False, use_cspnext_block: bool = False, channel_attention: bool = False, conv_cfg: OptConfigType = None, norm_cfg: ConfigType = dict( type='BN', momentum=0.03, eps=0.001), act_cfg: ConfigType = dict(type='Swish'), init_cfg: OptMultiConfig = None) -> None: super().__init__() block = CSPNeXtBlock if use_cspnext_block else DarknetBottleneck mid_channels = int(out_channels * expand_ratio) self.channel_attention = channel_attention self.main_conv = ConvModule( in_channels, mid_channels, 1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) self.short_conv = ConvModule( in_channels, mid_channels, 1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) self.final_conv = ConvModule( 2 * mid_channels, out_channels, 1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) self.blocks = nn.Sequential(*[ block( mid_channels, mid_channels, 1.0, add_identity, use_depthwise, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) for _ in range(num_blocks) ]) if channel_attention: self.attention = ChannelAttention(2 * mid_channels) def forward(self, x: Tensor) -> Tensor: """Forward function.""" x_short = self.short_conv(x) x_main = self.main_conv(x) x_main = self.blocks(x_main) x_final = torch.cat((x_main, x_short), dim=1) if self.channel_attention: x_final = self.attention(x_final) return self.final_conv(x_final) class CSPNeXt(nn.Module): """CSPNeXt backbone used in RTMDet. This is a standalone implementation without requiring the mmdet registry. Args: arch (str): Architecture of CSPNeXt, from {P5, P6}. Defaults to P5. expand_ratio (float): Ratio to adjust the number of channels of the hidden layer. Defaults to 0.5. deepen_factor (float): Depth multiplier, multiply number of blocks in CSP layer by this amount. Defaults to 1.0. widen_factor (float): Width multiplier, multiply number of channels in each layer by this amount. Defaults to 1.0. out_indices (Sequence[int]): Output from which stages. Defaults to (2, 3, 4). frozen_stages (int): Stages to be frozen (stop grad and set eval mode). -1 means not freezing any parameters. Defaults to -1. use_depthwise (bool): Whether to use depthwise separable convolution. Defaults to False. arch_ovewrite (list): Overwrite default arch settings. Defaults to None. spp_kernel_sizes: (tuple[int]): Sequential of kernel sizes of SPP layers. Defaults to (5, 9, 13). channel_attention (bool): Whether to add channel attention in each stage. Defaults to True. conv_cfg (:obj:`ConfigDict` or dict, optional): Config dict for convolution layer. Defaults to None. norm_cfg (:obj:`ConfigDict` or dict): Dictionary to construct and config norm layer. Defaults to dict(type='BN', requires_grad=True). act_cfg (:obj:`ConfigDict` or dict): Config dict for activation layer. Defaults to dict(type='SiLU'). norm_eval (bool): Whether to set norm layers to eval mode, namely, freeze running stats (mean and var). Note: Effect on Batch Norm and its variants only. """ # From left to right: # in_channels, out_channels, num_blocks, add_identity, use_spp arch_settings = { 'P5': [[64, 128, 3, True, False], [128, 256, 6, True, False], [256, 512, 6, True, False], [512, 1024, 3, False, True]], 'P6': [[64, 128, 3, True, False], [128, 256, 6, True, False], [256, 512, 6, True, False], [512, 768, 3, True, False], [768, 1024, 3, False, True]] } def __init__( self, arch: str = 'P5', deepen_factor: float = 1.0, widen_factor: float = 1.0, out_indices: Sequence[int] = (2, 3, 4), frozen_stages: int = -1, use_depthwise: bool = False, expand_ratio: float = 0.5, arch_ovewrite: dict = None, spp_kernel_sizes: Sequence[int] = (5, 9, 13), channel_attention: bool = True, conv_cfg: OptConfigType = None, norm_cfg: ConfigType = dict(type='BN', momentum=0.03, eps=0.001), act_cfg: ConfigType = dict(type='SiLU'), norm_eval: bool = False, init_cfg: OptMultiConfig = dict( type='Kaiming', layer='Conv2d', a=math.sqrt(5), distribution='uniform', mode='fan_in', nonlinearity='leaky_relu') ) -> None: super().__init__() arch_setting = self.arch_settings[arch] if arch_ovewrite: arch_setting = arch_ovewrite assert set(out_indices).issubset( i for i in range(len(arch_setting) + 1)) if frozen_stages not in range(-1, len(arch_setting) + 1): raise ValueError('frozen_stages must be in range(-1, ' 'len(arch_setting) + 1). But received ' f'{frozen_stages}') self.out_indices = out_indices self.frozen_stages = frozen_stages self.use_depthwise = use_depthwise self.norm_eval = norm_eval conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule self.stem = nn.Sequential( ConvModule( 3, int(arch_setting[0][0] * widen_factor // 2), 3, padding=1, stride=2, norm_cfg=norm_cfg, act_cfg=act_cfg), ConvModule( int(arch_setting[0][0] * widen_factor // 2), int(arch_setting[0][0] * widen_factor // 2), 3, padding=1, stride=1, norm_cfg=norm_cfg, act_cfg=act_cfg), ConvModule( int(arch_setting[0][0] * widen_factor // 2), int(arch_setting[0][0] * widen_factor), 3, padding=1, stride=1, norm_cfg=norm_cfg, act_cfg=act_cfg)) self.layers = ['stem'] for i, (in_channels, out_channels, num_blocks, add_identity, use_spp) in enumerate(arch_setting): in_channels = int(in_channels * widen_factor) out_channels = int(out_channels * widen_factor) num_blocks = max(round(num_blocks * deepen_factor), 1) stage = [] conv_layer = conv( in_channels, out_channels, 3, stride=2, padding=1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) stage.append(conv_layer) if use_spp: spp = SPPBottleneck( out_channels, out_channels, kernel_sizes=spp_kernel_sizes, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) stage.append(spp) csp_layer = CSPLayer( out_channels, out_channels, num_blocks=num_blocks, add_identity=add_identity, use_depthwise=use_depthwise, use_cspnext_block=True, expand_ratio=expand_ratio, channel_attention=channel_attention, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg) stage.append(csp_layer) self.add_module(f'stage{i + 1}', nn.Sequential(*stage)) self.layers.append(f'stage{i + 1}') def freeze_stages(self) -> None: """Freeze stages parameters.""" if self.frozen_stages >= 0: for i in range(self.frozen_stages + 1): m = getattr(self, self.layers[i]) m.eval() for param in m.parameters(): param.requires_grad = False def train(self, mode=True) -> None: """Convert the model into training mode while keeping normalization layer frozen.""" super().train(mode) self.freeze_stages() if mode and self.norm_eval: for m in self.modules(): if isinstance(m, _BatchNorm): m.eval() def forward(self, x: Tuple[Tensor, ...]) -> Tuple[Tensor, ...]: outs = [] for i, layer_name in enumerate(self.layers): layer = getattr(self, layer_name) x = layer(x) if i in self.out_indices: outs.append(x) return tuple(outs) class CSPNeXtPAFPN(nn.Module): """Path Aggregation Network with CSPNeXt blocks. This is a standalone implementation that works with the CSPNeXt backbone. Args: in_channels (Sequence[int]): Number of input channels per scale. out_channels (int): Number of output channels (used at each scale) out_indices (Sequence[int]): Output from which stages. num_csp_blocks (int): Number of bottlenecks in CSPLayer. Defaults to 3. use_depthwise (bool): Whether to use depthwise separable convolution in blocks. Defaults to False. expand_ratio (float): Ratio to adjust the number of channels of the hidden layer. Default: 0.5 upsample_cfg (dict): Config dict for interpolate layer. Default: `dict(scale_factor=2, mode='nearest')` conv_cfg (dict, optional): Config dict for convolution layer. Default: None, which means using conv2d. norm_cfg (dict): Config dict for normalization layer. Default: dict(type='BN') act_cfg (dict): Config dict for activation layer. Default: dict(type='Swish') """ def __init__( self, in_channels: Sequence[int], out_channels: int, out_indices=(0, 1, 2), num_csp_blocks: int = 3, use_depthwise: bool = False, expand_ratio: float = 0.5, upsample_cfg: ConfigType = dict(scale_factor=2, mode='nearest'), conv_cfg: OptConfigType = None, norm_cfg: ConfigType = dict(type='BN', momentum=0.03, eps=0.001), act_cfg: ConfigType = dict(type='Swish'), init_cfg: OptMultiConfig = dict( type='Kaiming', layer='Conv2d', a=math.sqrt(5), distribution='uniform', mode='fan_in', nonlinearity='leaky_relu') ) -> None: super().__init__() self.in_channels = in_channels self.out_channels = out_channels self.out_indices = out_indices conv = DepthwiseSeparableConvModule if use_depthwise else ConvModule # build top-down blocks self.upsample = nn.Upsample(**upsample_cfg) self.reduce_layers = nn.ModuleList() self.top_down_blocks = nn.ModuleList() for idx in range(len(in_channels) - 1, 0, -1): self.reduce_layers.append( ConvModule( in_channels[idx], in_channels[idx - 1], 1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg)) self.top_down_blocks.append( CSPLayer( in_channels[idx - 1] * 2, in_channels[idx - 1], num_blocks=num_csp_blocks, add_identity=False, use_depthwise=use_depthwise, use_cspnext_block=True, expand_ratio=expand_ratio, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg)) # build bottom-up blocks self.downsamples = nn.ModuleList() self.bottom_up_blocks = nn.ModuleList() for idx in range(len(in_channels) - 1): self.downsamples.append( conv( in_channels[idx], in_channels[idx], 3, stride=2, padding=1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg)) self.bottom_up_blocks.append( CSPLayer( in_channels[idx] * 2, in_channels[idx + 1], num_blocks=num_csp_blocks, add_identity=False, use_depthwise=use_depthwise, use_cspnext_block=True, expand_ratio=expand_ratio, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg)) if self.out_channels is not None: self.out_convs = nn.ModuleList() for i in range(len(in_channels)): self.out_convs.append( conv( in_channels[i], out_channels, 3, padding=1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, act_cfg=act_cfg)) def forward(self, inputs: Tuple[Tensor, ...]) -> Tuple[Tensor, ...]: """ Args: inputs (tuple[Tensor]): input features. Returns: tuple[Tensor]: YOLOXPAFPN features. """ assert len(inputs) == len(self.in_channels) # top-down path inner_outs = [inputs[-1]] for idx in range(len(self.in_channels) - 1, 0, -1): feat_high = inner_outs[0] feat_low = inputs[idx - 1] feat_high = self.reduce_layers[len(self.in_channels) - 1 - idx]( feat_high) inner_outs[0] = feat_high upsample_feat = self.upsample(feat_high) inner_out = self.top_down_blocks[len(self.in_channels) - 1 - idx]( torch.cat([upsample_feat, feat_low], 1)) inner_outs.insert(0, inner_out) # bottom-up path outs = [inner_outs[0]] for idx in range(len(self.in_channels) - 1): feat_low = outs[-1] feat_high = inner_outs[idx + 1] downsample_feat = self.downsamples[idx](feat_low) out = self.bottom_up_blocks[idx]( torch.cat([downsample_feat, feat_high], 1)) outs.append(out) if self.out_channels is not None: # out convs for idx in range(len(outs)): outs[idx] = self.out_convs[idx](outs[idx]) return tuple([outs[i] for i in self.out_indices]) class MlvlPointGenerator: """Standard points generator for multi-level feature maps.""" def __init__( self, strides, offset: float = 0.5 ) -> None: if not isinstance(strides, (list, tuple)): strides = [strides] self.strides = strides self.offset = offset def grid_priors( self, featmap_sizes, dtype=torch.float32, device='cuda', with_stride=False ): """Generate grid points of multiple feature levels.""" num_levels = len(featmap_sizes) multi_level_priors = [] for i in range(num_levels): priors = self.single_level_grid_priors( featmap_sizes[i], level_idx=i, dtype=dtype, device=device, with_stride=with_stride) multi_level_priors.append(priors) return multi_level_priors def single_level_grid_priors( self, featmap_size, level_idx, dtype=torch.float32, device='cuda', with_stride=False ): """Generate grid points for a single feature level.""" feat_h, feat_w = featmap_size stride = self.strides[level_idx] # Create grid coordinates shift_x = (torch.arange(0, feat_w, device=device) + self.offset) * stride shift_y = (torch.arange(0, feat_h, device=device) + self.offset) * stride shift_x = shift_x.to(dtype) shift_y = shift_y.to(dtype) # Create grid shift_yy, shift_xx = torch.meshgrid(shift_y, shift_x, indexing="ij") shift_xx = shift_xx.reshape(-1) shift_yy = shift_yy.reshape(-1) if not with_stride: shifts = torch.stack([shift_xx, shift_yy], dim=-1) else: # Include stride information stride_tensor = torch.tensor(stride, dtype=dtype, device=device) stride_xx = torch.full_like(shift_xx, stride_tensor) stride_yy = torch.full_like(shift_yy, stride_tensor) shifts = torch.stack([shift_xx, shift_yy, stride_xx, stride_yy], dim=-1) return shifts # Helper functions needed for geometric mean sigmoid def sigmoid_geometric_mean(x, y): """Compute geometric mean of two sigmoid functions.""" x_sigmoid = torch.sigmoid(x) y_sigmoid = torch.sigmoid(y) return torch.sqrt(x_sigmoid * y_sigmoid) def inverse_sigmoid(x, eps=1e-5): """Inverse function of sigmoid.""" x = x.clamp(min=0, max=1) x1 = x.clamp(min=eps) x2 = (1 - x).clamp(min=eps) return torch.log(x1 / x2) class RTMDetSepBNHead(nn.Module): """RTMDetHead with separated BN layers and shared conv layers.""" def __init__( self, num_classes: int, in_channels: int, share_conv: bool = True, use_depthwise: bool = False, pred_kernel_size: int = 1, stacked_convs: int = 2, feat_channels: int = 256, strides: List[int] = [8, 16, 32], with_objectness: bool = False, exp_on_reg: bool = False, ) -> None: super().__init__() self.num_classes = num_classes self.cls_out_channels = num_classes # For sigmoid self.in_channels = in_channels self.feat_channels = feat_channels self.stacked_convs = stacked_convs self.share_conv = share_conv self.use_depthwise = use_depthwise self.pred_kernel_size = pred_kernel_size self.with_objectness = with_objectness self.exp_on_reg = exp_on_reg self.strides = strides # Number of anchors per grid point self.num_base_priors = 1 self._init_layers() def _init_layers(self) -> None: """Initialize layers of the head.""" self.cls_convs = nn.ModuleList() self.reg_convs = nn.ModuleList() self.rtm_cls = nn.ModuleList() self.rtm_reg = nn.ModuleList() if self.with_objectness: self.rtm_obj = nn.ModuleList() for n in range(len(self.strides)): cls_convs = nn.ModuleList() reg_convs = nn.ModuleList() for i in range(self.stacked_convs): chn = self.in_channels if i == 0 else self.feat_channels if self.use_depthwise: cls_conv = DepthwiseSeparableConvModule( chn, self.feat_channels, 3, stride=1, padding=1, bias=False, act_cfg=dict(type='SiLU'), norm_cfg=dict(type='BN', momentum=0.03, eps=0.001) ) reg_conv = DepthwiseSeparableConvModule( chn, self.feat_channels, 3, stride=1, padding=1, bias=False, act_cfg=dict(type='SiLU'), norm_cfg=dict(type='BN', momentum=0.03, eps=0.001) ) else: cls_conv = ConvModule( chn, self.feat_channels, 3, stride=1, padding=1, bias=False, act_cfg=dict(type='SiLU'), norm_cfg=dict(type='BN', momentum=0.03, eps=0.001)) reg_conv = ConvModule( chn, self.feat_channels, 3, stride=1, padding=1, bias=False, act_cfg=dict(type='SiLU'), norm_cfg=dict(type='BN', momentum=0.03, eps=0.001)) # Append conv layers to the list cls_convs.append(cls_conv) reg_convs.append(reg_conv) self.cls_convs.append(cls_convs) self.reg_convs.append(reg_convs) self.rtm_cls.append( nn.Conv2d( self.feat_channels, self.num_base_priors * self.cls_out_channels, self.pred_kernel_size, padding=self.pred_kernel_size // 2)) self.rtm_reg.append( nn.Conv2d( self.feat_channels, self.num_base_priors * 4, self.pred_kernel_size, padding=self.pred_kernel_size // 2)) if self.with_objectness: self.rtm_obj.append( nn.Conv2d( self.feat_channels, 1, self.pred_kernel_size, padding=self.pred_kernel_size // 2)) if self.share_conv: for n in range(1, len(self.strides)): for i in range(self.stacked_convs): self.cls_convs[n][i] = self.cls_convs[0][i] self.reg_convs[n][i] = self.reg_convs[0][i] # Initialize MlvlPointGenerator for anchor-free detection self.prior_generator = MlvlPointGenerator(self.strides, offset=0.0) def init_weights(self): """Initialize weights of the head.""" # Initialize conv layers with normal distribution for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.normal_(m.weight, mean=0, std=0.01) if m.bias is not None: nn.init.constant_(m.bias, 0) if isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) # Initialize classification layers with a prior probability bias_init = -torch.log(torch.tensor((1 - 0.01) / 0.01)) for rtm_cls in self.rtm_cls: nn.init.normal_(rtm_cls.weight, mean=0, std=0.01) nn.init.constant_(rtm_cls.bias, bias_init) for rtm_reg in self.rtm_reg: nn.init.normal_(rtm_reg.weight, mean=0, std=0.01) nn.init.constant_(rtm_reg.bias, 0) if self.with_objectness: for rtm_obj in self.rtm_obj: nn.init.normal_(rtm_obj.weight, mean=0, std=0.01) nn.init.constant_(rtm_obj.bias, bias_init) def forward(self, feats): """Forward features from the upstream network. Args: feats (tuple[Tensor]): Features from the upstream network, each is a 4D-tensor. Returns: tuple: Usually a tuple of classification scores and bbox prediction - cls_scores (list[Tensor]): Classification scores for all scale levels, each is a 4D-tensor. - bbox_preds (list[Tensor]): Box energies / deltas for all scale levels, each is a 4D-tensor. """ cls_scores = [] bbox_preds = [] for idx, (x, stride) in enumerate( zip(feats, self.strides)): cls_feat = x reg_feat = x for cls_layer in self.cls_convs[idx]: cls_feat = cls_layer(cls_feat) cls_score = self.rtm_cls[idx](cls_feat) for reg_layer in self.reg_convs[idx]: reg_feat = reg_layer(reg_feat) if self.with_objectness: objectness = self.rtm_obj[idx](reg_feat) cls_score = inverse_sigmoid( sigmoid_geometric_mean(cls_score, objectness)) if self.exp_on_reg: # Convert anchor-free to distance prediction, with stride scale reg_dist = self.rtm_reg[idx](reg_feat).exp() * stride else: reg_dist = self.rtm_reg[idx](reg_feat) * stride cls_scores.append(cls_score) bbox_preds.append(reg_dist) return tuple(cls_scores), tuple(bbox_preds) def predict(self, cls_scores, bbox_preds, batch_img_metas=None, cfg=None, rescale=False, with_nms=True, score_thr=0.05, nms_iou_threshold=0.6, max_per_img=100): """Transform network outputs into bbox predictions. This is a simplified version for inference only. """ assert len(cls_scores) == len(bbox_preds) num_levels = len(cls_scores) device = cls_scores[0].device batch_size = cls_scores[0].shape[0] # If no image metadata is provided, create default ones if batch_img_metas is None: # Use input feature size to estimate image size featmap_sizes = [cls_scores[i].shape[-2:] for i in range(num_levels)] strides = self.strides # Calculate original image size based on feature map sizes and strides # This is approximate but works for most cases upscaled_sizes = [] for i, featmap_size in enumerate(featmap_sizes): h, w = featmap_size upscaled_sizes.append((h * strides[i], w * strides[i])) # Use the maximum size across levels img_h = max(s[0] for s in upscaled_sizes) img_w = max(s[1] for s in upscaled_sizes) batch_img_metas = [{ 'img_shape': (img_h, img_w, 3), 'scale_factor': [1.0, 1.0, 1.0, 1.0] } for _ in range(batch_size)] # Get feature map sizes featmap_sizes = [cls_scores[i].shape[-2:] for i in range(num_levels)] # Generate grid points for each level mlvl_priors = self.prior_generator.grid_priors( featmap_sizes, dtype=cls_scores[0].dtype, device=device, with_stride=True) result_list = [] for img_id in range(batch_size): img_meta = batch_img_metas[img_id] cls_score_list = [ cls_scores[i][img_id].detach() for i in range(num_levels) ] bbox_pred_list = [ bbox_preds[i][img_id].detach() for i in range(num_levels) ] results = self._predict_by_feat_single( cls_score_list, bbox_pred_list, mlvl_priors, img_meta, score_thr=score_thr, nms_iou_threshold=nms_iou_threshold, max_per_img=max_per_img, rescale=rescale, with_nms=with_nms ) result_list.append(results) # Convert the results to a more standardized format boxes_batch = [] scores_batch = [] labels_batch = [] for result in result_list: boxes = result['bboxes'] scores = result.get('scores', boxes[:, -1]) labels = result['labels'] # Ensure boxes have only coordinates (some implementations add score as 5th column) if boxes.shape[1] > 4: boxes = boxes[:, :4] boxes_batch.append(boxes) scores_batch.append(scores) labels_batch.append(labels) # Stack results if there's at least one detection in each image if all(len(boxes) > 0 for boxes in boxes_batch): return DetectionOutput( boxes=torch.stack(boxes_batch), scores=torch.stack(scores_batch), labels=torch.stack(labels_batch) ) # Handle case where some images have no detections max_num = max(len(boxes) for boxes in boxes_batch) if max_num == 0: # No detections at all dummy = torch.zeros((batch_size, 0, 4), device=device) return DetectionOutput( boxes=dummy, scores=torch.zeros((batch_size, 0), device=device), labels=torch.zeros((batch_size, 0), dtype=torch.long, device=device) ) # Pad results to have consistent tensor shapes padded_boxes = [] padded_scores = [] padded_labels = [] for boxes, scores, labels in zip(boxes_batch, scores_batch, labels_batch): num_dets = len(boxes) if num_dets == 0: padded_boxes.append(torch.zeros((max_num, 4), device=device)) padded_scores.append(torch.zeros(max_num, device=device)) padded_labels.append(torch.zeros(max_num, dtype=torch.long, device=device)) else: padding = torch.zeros((max_num - num_dets, 4), device=device) padded_boxes.append(torch.cat([boxes, padding], dim=0)) padding = torch.zeros(max_num - num_dets, device=device) padded_scores.append(torch.cat([scores, padding], dim=0)) padding = torch.zeros(max_num - num_dets, dtype=torch.long, device=device) padded_labels.append(torch.cat([labels, padding], dim=0)) return DetectionOutput( boxes=torch.stack(padded_boxes), scores=torch.stack(padded_scores), labels=torch.stack(padded_labels) ) def _predict_by_feat_single(self, cls_score_list, bbox_pred_list, mlvl_priors, img_meta, score_thr=0.05, nms_iou_threshold=0.6, max_per_img=100, rescale=False, with_nms=True): """Transform outputs of a single image into bbox predictions. This is a simplified version for inference only. """ # For each scale level mlvl_bboxes = [] mlvl_scores = [] for level_idx, (cls_score, bbox_pred, priors) in enumerate( zip(cls_score_list, bbox_pred_list, mlvl_priors)): assert cls_score.size()[-2:] == bbox_pred.size()[-2:] # Reshape cls_score = cls_score.permute(1, 2, 0).reshape(-1, self.cls_out_channels) bbox_pred = bbox_pred.permute(1, 2, 0).reshape(-1, 4) # Get scores scores = torch.sigmoid(cls_score) # Find high-scoring predictions max_scores, _ = scores.max(dim=1) keep_mask = max_scores > score_thr scores = scores[keep_mask] bbox_pred = bbox_pred[keep_mask] priors = priors[keep_mask] # If no valid predictions for this level, continue if scores.numel() == 0: continue # Decode bboxes bboxes = self._decode_bboxes(priors, bbox_pred, img_meta.get('img_shape')) mlvl_bboxes.append(bboxes) mlvl_scores.append(scores) # Combine all levels if len(mlvl_bboxes) == 0: # Return empty result if no valid predictions return { 'bboxes': torch.zeros((0, 4), device=cls_score_list[0].device), 'scores': torch.zeros((0,), device=cls_score_list[0].device), 'labels': torch.zeros((0,), device=cls_score_list[0].device, dtype=torch.long) } bboxes = torch.cat(mlvl_bboxes) scores = torch.cat(mlvl_scores) # Optional rescaling to original image size if rescale and 'scale_factor' in img_meta: bboxes /= bboxes.new_tensor(img_meta['scale_factor']).repeat((1, 2)) # Apply NMS for each class if with_nms: det_bboxes, det_labels = self._nms(bboxes, scores, nms_iou_threshold, max_per_img) else: # Just return top k scores without NMS scores_flattened = scores.flatten() if scores_flattened.size(0) > max_per_img: top_scores, indices = scores_flattened.topk(max_per_img) scores_top_k = scores.view(-1, self.num_classes).index_select(0, indices) bboxes_top_k = bboxes.index_select(0, indices) labels_top_k = indices % self.num_classes det_bboxes = torch.cat([bboxes_top_k, top_scores.unsqueeze(-1)], dim=1) det_labels = labels_top_k else: # Convert to the same format with NMS num_bboxes = bboxes.size(0) max_scores, labels = scores.max(dim=1) det_bboxes = torch.cat([bboxes, max_scores.unsqueeze(-1)], dim=1) det_labels = labels return { 'bboxes': det_bboxes, 'scores': det_bboxes[:, -1], 'labels': det_labels } def _decode_bboxes(self, priors, distance, max_shape=None): """Decode distance predictions to bounding box coordinates.""" # Get xy coordinates of priors (grid points) xy = priors[..., :2] # Distance predictions to 4 boundaries (left, top, right, bottom) # distances = [l, t, r, b] # Calculate bbox coordinates x1 = xy[..., 0] - distance[..., 0] y1 = xy[..., 1] - distance[..., 1] x2 = xy[..., 0] + distance[..., 2] y2 = xy[..., 1] + distance[..., 3] bboxes = torch.stack([x1, y1, x2, y2], -1) # Clip boxes to image boundaries if needed if max_shape is not None: bboxes[..., 0].clamp_(min=0, max=max_shape[1]) bboxes[..., 1].clamp_(min=0, max=max_shape[0]) bboxes[..., 2].clamp_(min=0, max=max_shape[1]) bboxes[..., 3].clamp_(min=0, max=max_shape[0]) return bboxes def _nms(self, bboxes, scores, iou_threshold, max_per_img): """Apply NMS to detection results.""" # For each class num_classes = scores.shape[1] det_bboxes = [] det_labels = [] for cls_idx in range(num_classes): cls_scores = scores[:, cls_idx] keep_idx = cls_scores > 0.05 # Apply score threshold if not keep_idx.any(): continue cls_bboxes = bboxes[keep_idx] cls_scores = cls_scores[keep_idx] # Apply NMS for this class keep = self._batched_nms(cls_bboxes, cls_scores, iou_threshold) keep = keep[:max_per_img] det_bboxes.append(torch.cat([cls_bboxes[keep], cls_scores[keep].unsqueeze(-1)], dim=1)) det_labels.append(cls_bboxes.new_full((keep.size(0),), cls_idx, dtype=torch.long)) if len(det_bboxes) > 0: det_bboxes = torch.cat(det_bboxes, dim=0) det_labels = torch.cat(det_labels, dim=0) # Sort by score _, indices = det_bboxes[:, -1].sort(descending=True) det_bboxes = det_bboxes[indices] det_labels = det_labels[indices] # Limit to max_per_img det_bboxes = det_bboxes[:max_per_img] det_labels = det_labels[:max_per_img] else: # Return empty tensors if no detections det_bboxes = bboxes.new_zeros((0, 5)) det_labels = bboxes.new_zeros((0,), dtype=torch.long) return det_bboxes, det_labels def _batched_nms(self, boxes, scores, iou_threshold): """Performs non-maximum suppression on a batch of boxes.""" if boxes.shape[0] == 0: return boxes.new_zeros(0, dtype=torch.long) try: # Try to use torchvision NMS for speed if available return torchvision.ops.nms(boxes, scores, iou_threshold) except: # Fall back to manual NMS implementation x1 = boxes[:, 0] y1 = boxes[:, 1] x2 = boxes[:, 2] y2 = boxes[:, 3] areas = (x2 - x1) * (y2 - y1) _, order = scores.sort(descending=True) keep = [] while order.size(0) > 0: i = order[0].item() keep.append(i) if order.size(0) == 1: break xx1 = torch.max(x1[order[1:]], x1[i]) yy1 = torch.max(y1[order[1:]], y1[i]) xx2 = torch.min(x2[order[1:]], x2[i]) yy2 = torch.min(y2[order[1:]], y2[i]) w = torch.clamp(xx2 - xx1, min=0) h = torch.clamp(yy2 - yy1, min=0) inter = w * h iou = inter / (areas[i] + areas[order[1:]] - inter) inds = torch.where(iou <= iou_threshold)[0] order = order[inds + 1] return torch.tensor(keep, dtype=torch.long, device=boxes.device) class RTMDetModel(PreTrainedModel): """ RTMDet object detection model compatible with Hugging Face transformers. Updated implementation using PyTorch only with no NumPy or OpenCV dependencies. This model consists of a backbone, neck, and detection head for object detection. """ config_class = RTMDetConfig base_model_prefix = "rtmdet" main_input_name = "pixel_values" # share_conv aliases BN modules across strides; override tied-weights # method so transformers never calls get_parameter() on BN buffers. _tied_weights_keys = None def mark_tied_weights_as_initialized(self): pass # share_conv makes BN buffers look tied — skip to avoid AttributeError def __init__(self, config): super().__init__(config) # Build backbone self.backbone = CSPNeXt( arch=config.backbone_arch, deepen_factor=config.backbone_deepen_factor, widen_factor=config.backbone_widen_factor, expand_ratio=config.backbone_expand_ratio, channel_attention=config.backbone_channel_attention, use_depthwise=False, ) # Build neck self.neck = CSPNeXtPAFPN( in_channels=config.neck_in_channels, out_channels=config.neck_out_channels, num_csp_blocks=config.neck_num_csp_blocks, expand_ratio=config.neck_expand_ratio, use_depthwise=False, ) # Build head self.bbox_head = RTMDetSepBNHead( num_classes=config.num_classes, in_channels=config.head_in_channels, stacked_convs=config.head_stacked_convs, feat_channels=config.head_feat_channels, with_objectness=config.head_with_objectness, exp_on_reg=config.head_exp_on_reg, share_conv=config.head_share_conv, pred_kernel_size=config.head_pred_kernel_size, strides=config.strides, use_depthwise=False ) # Initialize weights self.init_weights() # Required: triggers post_init() which sets all_tied_weights_keys etc. self.post_init() def init_weights(self): """Initialize the weights of the model.""" # Backbone is usually initialized from pre-trained weights # so we don't need special initialization # Initialize head self.bbox_head.init_weights() def forward( self, pixel_values=None, original_size=None, labels=None, output_hidden_states=None, return_dict=None, ): """ Forward pass of the model. Args: pixel_values (`torch.FloatTensor` of shape `(batch_size, channels, height, width)`): Pixel values resized to 640×640 by the image processor. original_size (`Tuple[int, int]`, *optional*): ``(height, width)`` of the **original** image before preprocessing. When supplied, the returned boxes are automatically scaled from 640×640 model-input space to original image pixel coordinates so the caller never needs to compute ``sx = orig_w / 640`` manually. All images in the batch are assumed to share the same original size. labels (`List[Dict]`, *optional*): Labels for computing the detection loss. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. return_dict (`bool`, *optional*): Whether or not to return a ModelOutput instead of a plain tuple. Returns: `DetectionOutput` or `tuple`: Boxes are in 640×640 space by default, or in original image space when ``original_size`` is provided. """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict # Get inputs if pixel_values is None: raise ValueError("You have to specify pixel_values") batch_size, channels, height, width = pixel_values.shape # Extract features from backbone backbone_features = self.backbone(pixel_values) # Process features through neck neck_features = self.neck(backbone_features) # Get cls_scores and bbox_preds from head cls_scores, bbox_preds = self.bbox_head(neck_features) if labels is not None: # Training mode: calculate loss (not implemented in this simplified version) loss = torch.tensor(0.0, device=pixel_values.device) if return_dict: return DetectionOutput(loss=loss) else: return (loss,) # Inference mode: Get detection results # Create default batch_img_metas for prediction batch_img_metas = [{ 'img_shape': (height, width, 3), 'scale_factor': [1.0, 1.0, 1.0, 1.0] } for _ in range(batch_size)] # Call predict method with parameters from config results = self.bbox_head.predict( cls_scores=cls_scores, bbox_preds=bbox_preds, batch_img_metas=batch_img_metas, rescale=False, with_nms=True, score_thr=self.config.score_threshold, nms_iou_threshold=self.config.nms_threshold, max_per_img=self.config.max_detections ) # Scale boxes from 640×640 model space → original image space if requested if original_size is not None: orig_h, orig_w = original_size sx = orig_w / width # width == 640 sy = orig_h / height # height == 640 scaled_boxes = results.boxes.clone() scaled_boxes[..., 0] *= sx # x1 scaled_boxes[..., 2] *= sx # x2 scaled_boxes[..., 1] *= sy # y1 scaled_boxes[..., 3] *= sy # y2 results = DetectionOutput( boxes=scaled_boxes, scores=results.scores, labels=results.labels, ) if return_dict: return results else: # Return as tuple (boxes, scores, labels) return (results.boxes, results.scores, results.labels)