| |
| import json |
| import os.path as osp |
| import time |
| from typing import Any, Callable, Dict, List, Optional, Union |
|
|
| import torch |
|
|
| from mmengine.logging import print_log |
|
|
| try: |
| import deepspeed |
| except ImportError: |
| deepspeed = None |
|
|
| import logging |
|
|
| import torch.nn as nn |
|
|
| import mmengine |
| from mmengine.dist import init_dist, is_main_process |
| from mmengine.optim import BaseOptimWrapper, _ParamScheduler |
| from mmengine.registry import (MODEL_WRAPPERS, OPTIM_WRAPPERS, OPTIMIZERS, |
| STRATEGIES) |
| from mmengine.runner.checkpoint import save_checkpoint, weights_to_cpu |
| from mmengine.utils import apply_to, digit_version, get_git_hash |
| from .base import BaseStrategy |
|
|
|
|
| def register_deepspeed_optimizers() -> List[str]: |
| """Register optimizers in ``deepspeed`` to the ``OPTIMIZERS`` registry. |
| |
| Returns: |
| List[str]: A list of registered optimizers' name. |
| """ |
| deepspeed_optimizers = [] |
| try: |
| import deepspeed |
| except ImportError: |
| pass |
| else: |
| from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam |
| from deepspeed.ops.lamb import FusedLamb |
| from deepspeed.runtime.fp16.onebit import (OnebitAdam, OnebitLamb, |
| ZeroOneAdam) |
|
|
| OPTIMIZERS.register_module(module=DeepSpeedCPUAdam) |
| deepspeed_optimizers.append('DeepSpeedCPUAdam') |
| OPTIMIZERS.register_module(module=FusedAdam) |
| deepspeed_optimizers.append('FusedAdam') |
| OPTIMIZERS.register_module(module=FusedLamb) |
| deepspeed_optimizers.append('FusedLamb') |
| OPTIMIZERS.register_module(module=OnebitAdam) |
| deepspeed_optimizers.append('OnebitAdam') |
| OPTIMIZERS.register_module(module=OnebitLamb) |
| deepspeed_optimizers.append('OnebitLamb') |
| OPTIMIZERS.register_module(module=ZeroOneAdam) |
| deepspeed_optimizers.append('ZeroOneAdam') |
|
|
| return deepspeed_optimizers |
|
|
|
|
| @OPTIM_WRAPPERS.register_module() |
| class DeepSpeedOptimWrapper(BaseOptimWrapper): |
|
|
| def __init__(self, optimizer): |
| super().__init__(optimizer) |
| self._model = None |
|
|
| @property |
| def model(self): |
| if self._model is None: |
| raise ValueError('model attribute should be set before accessing.') |
| return self._model |
|
|
| @model.setter |
| def model(self, value): |
| self._model = value |
|
|
| def update_params(self, loss) -> None: |
| """Update parameters in :attr:`optimizer`.""" |
| self.backward(loss) |
| self.step() |
|
|
| def backward(self, loss: torch.Tensor, **kwargs) -> None: |
| """"Perform gradient back propagation.""" |
| self.model.backward(loss) |
|
|
| def zero_grad(self, **kwargs) -> None: |
| raise NotImplementedError( |
| 'DeepSpeedOptimWrapper does not support zero_grad method ' |
| 'currently.') |
|
|
| def step(self, **kwargs): |
| self.model.step() |
|
|
| def state_dict(self) -> dict: |
| state_dict = {} |
| if self.base_param_settings is not None: |
| state_dict['base_param_settings'] = self.base_param_settings |
|
|
| return state_dict |
|
|
| def load_state_dict(self, state_dict: dict) -> None: |
| base_param_settings = state_dict.pop('base_param_settings', None) |
|
|
| if base_param_settings is not None: |
| self.base_param_settings = base_param_settings |
|
|
|
|
| @MODEL_WRAPPERS.register_module() |
| class MMDeepSpeedEngineWrapper: |
|
|
| def __init__( |
| self, |
| *, |
| model: 'deepspeed.DeepSpeedEngine', |
| inputs_to_half: Optional[List[Union[int, str]]] = None, |
| ): |
| self.model = model |
| self._inputs_to_half = inputs_to_half |
|
|
| def __getattr__(self, name): |
| return getattr(self.model, name) |
|
|
| def train_step( |
| self, |
| data: Union[dict, tuple, list], |
| optim_wrapper: DeepSpeedOptimWrapper, |
| ) -> Dict[str, torch.Tensor]: |
| data = self.model.module.data_preprocessor(data, training=True) |
| data = self._cast_inputs_half(data) |
| losses = self._run_forward(data, mode='loss') |
| parsed_loss, log_vars = self.model.module.parse_losses(losses) |
| optim_wrapper.update_params(parsed_loss) |
|
|
| return log_vars |
|
|
| def val_step(self, data: Union[dict, tuple, list]) -> list: |
| """Gets the prediction of module during validation process. |
| |
| Args: |
| data (dict or tuple or list): Data sampled from dataset. |
| |
| Returns: |
| list: The predictions of given data. |
| """ |
| data = self.model.module.data_preprocessor(data, False) |
| data = self._cast_inputs_half(data) |
| return self._run_forward(data, mode='predict') |
|
|
| def test_step(self, data: Union[dict, tuple, list]) -> list: |
| """Gets the predictions of module during testing process. |
| |
| Args: |
| data (dict or tuple or list): Data sampled from dataset. |
| |
| Returns: |
| list: The predictions of given data. |
| """ |
| data = self.model.module.data_preprocessor(data, False) |
| data = self._cast_inputs_half(data) |
| return self._run_forward(data, mode='predict') |
|
|
| def _run_forward(self, data: Union[dict, tuple, list], mode: str) -> Any: |
| """Unpacks data for :meth:`forward` |
| |
| Args: |
| data (dict or tuple or list): Data sampled from dataset. |
| mode (str): Mode of forward. |
| |
| Returns: |
| dict or list: Results of training or testing mode. |
| """ |
| if isinstance(data, dict): |
| results = self.model(**data, mode=mode) |
| elif isinstance(data, (list, tuple)): |
| results = self.model(*data, mode=mode) |
| else: |
| raise TypeError('Output of `data_preprocessor` should be ' |
| f'list, tuple or dict, but got {type(data)}') |
| return results |
|
|
| def _cast_inputs_half(self, inputs: Union[list, tuple, dict, None]): |
| """Cast inputs to half precision if needed. |
| |
| Args: |
| inputs (list or tuple or dict or None): Inputs to be casted. |
| |
| Returns: |
| list or tuple or dict or None: Casted inputs. |
| """ |
| if self._inputs_to_half is None: |
| return inputs |
|
|
| dtype = next(self.model.parameters()).dtype |
| if isinstance(inputs, (list, tuple)): |
| new_inputs = [] |
| for i, v in enumerate(inputs): |
| if i in self._inputs_to_half: |
| new_inputs.append( |
| apply_to(v, lambda x: hasattr(x, 'to'), |
| lambda x: x.to(dtype))) |
| else: |
| new_inputs.append(v) |
| return inputs.__class__(new_inputs) |
| elif isinstance(inputs, dict): |
| for k, v in inputs.items(): |
| if k in self._inputs_to_half: |
| inputs[k] = apply_to(v, lambda x: hasattr(x, 'to'), |
| lambda x: x.to(dtype)) |
| return inputs |
| else: |
| raise TypeError('inputs should be list, tuple or dict, ' |
| f'but got {type(inputs)}') |
|
|
|
|
| @STRATEGIES.register_module() |
| class DeepSpeedStrategy(BaseStrategy): |
| """Support training models with DeepSpeed. |
| |
| Note: |
| The detailed usage of parameters can be found at |
| https://www.deepspeed.ai/docs/config-json/. |
| |
| Args: |
| config (str or dict, optional): If it is a string, it is a path to load |
| config for deepspeed. Defaults to None. |
| zero_optimization (dict, optional): Enabling and configuring ZeRO |
| memory optimizations. Defaults to None. |
| gradient_clipping (float, optional): Enable gradient clipping with |
| value. Defaults to None. |
| fp16 (dict, optional): Configuration for using mixed precision/FP16 |
| training that leverages NVIDIA's Apex package. Defaults to None. |
| inputs_to_half (list[int or str], optional): Which inputs are to |
| converted to half precision. Defaults to None. |
| If ``fp16`` is enabled, it also should be set. |
| bf16 (dict, optional): Configuration for using bfloat16 floating-point |
| format as an alternative to FP16. Defaults to None. |
| amp (dict, optional): Configuration for using automatic mixed |
| precision (AMP) training that leverages NVIDIA's Apex AMP package. |
| Defaults to None. |
| activation_checkpointing (dict, optional): Reduce memory usage by |
| clearing activations of certain layers and recomputing them |
| during a backward pass. |
| Defaults to None. |
| aio (dict, optional): Configuring the asynchronous I/O module for |
| offloading parameter and optimizer states to persistent (NVMe) |
| storage. This module uses Linux native asynchronous I/O (libaio). |
| Defaults to None. |
| train_micro_batch_size_per_gpu (int, optional): Batch size to be |
| processed by one GPU in one step (without gradient accumulation). |
| Defaults to None. |
| gradient_accumulation_steps (int, optional): Number of training steps |
| to accumulate gradients before averaging and applying them. |
| Defaults to None. |
| exclude_frozen_parameters (bool, optional): Exclude frozen parameters |
| from saved checkpoint. |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| |
| config: Union[str, dict, None] = None, |
| zero_optimization: Optional[dict] = None, |
| gradient_clipping: Optional[float] = None, |
| fp16: Optional[dict] = None, |
| inputs_to_half: Optional[List[Union[int, str]]] = None, |
| bf16: Optional[dict] = None, |
| amp: Optional[dict] = None, |
| activation_checkpointing: Optional[dict] = None, |
| aio: Optional[dict] = None, |
| train_micro_batch_size_per_gpu: Optional[int] = None, |
| gradient_accumulation_steps: Optional[int] = None, |
| |
| steps_per_print: int = 10000000000000, |
| |
| exclude_frozen_parameters: Optional[bool] = None, |
| **kwargs, |
| ): |
| assert deepspeed is not None, \ |
| 'DeepSpeed is not installed. Please check ' \ |
| 'https://github.com/microsoft/DeepSpeed#installation.' |
|
|
| super().__init__(**kwargs) |
|
|
| self.config = self._parse_config(config) |
| if zero_optimization is not None: |
| self.config['zero_optimization'] = zero_optimization |
| if gradient_clipping is not None: |
| self.config['gradient_clipping'] = gradient_clipping |
| if fp16 is not None: |
| self.config['fp16'] = fp16 |
| if bf16 is not None: |
| self.config['bf16'] = bf16 |
| if amp is not None: |
| self.config['amp'] = amp |
| if activation_checkpointing is not None: |
| self.config['activation_checkpointing'] = activation_checkpointing |
| if aio is not None: |
| self.config['aio'] = aio |
| if train_micro_batch_size_per_gpu is not None: |
| self.config['train_micro_batch_size_per_gpu'] = \ |
| train_micro_batch_size_per_gpu |
| if gradient_accumulation_steps is not None: |
| self.config['gradient_accumulation_steps'] = \ |
| gradient_accumulation_steps |
| else: |
| self.config.setdefault('gradient_accumulation_steps', 1) |
| self.config['steps_per_print'] = steps_per_print |
| self._inputs_to_half = inputs_to_half |
| assert (exclude_frozen_parameters is None or |
| digit_version(deepspeed.__version__) >= digit_version('0.13.2') |
| ), ('DeepSpeed >= 0.13.2 is required to enable ' |
| 'exclude_frozen_parameters') |
| self.exclude_frozen_parameters = exclude_frozen_parameters |
|
|
| register_deepspeed_optimizers() |
|
|
| def _parse_config(self, config): |
| if config is None: |
| config = dict() |
| elif isinstance(config, str): |
| with open(config) as f: |
| config = json.load(f) |
| return config |
|
|
| def _setup_distributed( |
| self, |
| launcher: Optional[str] = None, |
| backend: str = 'nccl', |
| **kwargs, |
| ): |
| """Setup distributed environment. |
| |
| Args: |
| launcher (str, optional): Way to launch multi processes. |
| DeepSpeedStrategy does not support the launcher argument. |
| backend (str): Communication Backends. Supported backends are |
| 'nccl', 'gloo' and 'mpi'. Defaults to 'nccl'. |
| **kwargs: Other arguments for :func:`deepspeed.init_distributed`. |
| """ |
| init_dist(launcher, backend, init_backend='deepspeed', **kwargs) |
|
|
| def prepare( |
| self, |
| model: Union[nn.Module, dict], |
| *, |
| optim_wrapper: Union[BaseOptimWrapper, dict, None] = None, |
| param_scheduler: Union[_ParamScheduler, Dict, List, None] = None, |
| compile: Union[dict, bool] = False, |
| dispatch_kwargs: Optional[dict] = None, |
| ): |
| """Prepare model and some components. |
| |
| Args: |
| model (:obj:`torch.nn.Module` or dict): The model to be run. It |
| can be a dict used for build a model. |
| |
| Keyword Args: |
| optim_wrapper (BaseOptimWrapper or dict, optional): Computing the |
| gradient of model parameters and updating them. |
| Defaults to None. |
| See :meth:`build_optim_wrapper` for examples. |
| param_scheduler (_ParamScheduler or dict or list, optional): |
| Parameter scheduler for updating optimizer parameters. If |
| specified, :attr:`optim_wrapper` should also be specified. |
| Defaults to None. |
| See :meth:`build_param_scheduler` for examples. |
| compile (dict, optional): Config to compile model. |
| Defaults to False. Requires PyTorch>=2.0. |
| dispatch_kwargs (dict, optional): Kwargs to be passed to other |
| methods of Strategy. Defaults to None. |
| """ |
| if self._prepared: |
| return self._prepared_components() |
| assert dispatch_kwargs is not None |
| self.dispatch_kwargs.update(dispatch_kwargs) |
|
|
| model = self.build_model(model) |
| model = self._init_model_weights(model) |
|
|
| if optim_wrapper is not None: |
| self.optim_wrapper = self.build_optim_wrapper(optim_wrapper, model) |
| self.model = self._wrap_model(model) |
|
|
| self.optim_wrapper.model = self.model |
|
|
| else: |
| self.model = self._wrap_model(model) |
|
|
| if param_scheduler is not None: |
| self.param_schedulers = self.build_param_scheduler( |
| param_scheduler, self.optim_wrapper) |
| self._prepared = True |
| return self._prepared_components() |
|
|
| def _wrap_model(self, model: nn.Module) -> nn.Module: |
| if hasattr(self, 'optim_wrapper'): |
| engine, self.optim_wrapper.optimizer, *_ = deepspeed.initialize( |
| model=model, |
| optimizer=self.optim_wrapper.optimizer, |
| config=self.config) |
| else: |
| engine, *_ = deepspeed.initialize(model=model, config=self.config) |
|
|
| wrapper = MMDeepSpeedEngineWrapper( |
| model=engine, inputs_to_half=self._inputs_to_half) |
| return wrapper |
|
|
| def load_checkpoint( |
| self, |
| filename: str, |
| *, |
| map_location: Union[str, Callable] = 'cpu', |
| strict: bool = False, |
| revise_keys: list = [(r'^module.', '')], |
| callback: Optional[Callable] = None, |
| ) -> dict: |
| """Load checkpoint from given ``filename``. |
| |
| Warning: |
| `map_localtion` and `callback` parameters are not supported yet. |
| |
| Args: |
| filename (str): Accept local filepath, URL, ``torchvision://xxx``, |
| ``open-mmlab://xxx``. |
| """ |
| self.logger.info(f'Load checkpoint from {filename}') |
|
|
| dirname, basename = osp.split(filename) |
| if digit_version(deepspeed.__version__) >= digit_version('0.13.2'): |
| _, extra_ckpt = self.model.load_checkpoint( |
| dirname, |
| tag=basename, |
| load_optimizer_states=False, |
| load_module_strict=not self.exclude_frozen_parameters) |
| else: |
| _, extra_ckpt = self.model.load_checkpoint( |
| dirname, tag=basename, load_optimizer_states=False) |
|
|
| return extra_ckpt |
|
|
| def resume( |
| self, |
| filename: str, |
| *, |
| resume_optimizer: bool = True, |
| resume_param_scheduler: bool = True, |
| map_location: Union[str, Callable] = 'default', |
| callback: Optional[Callable] = None, |
| ) -> dict: |
| """Resume training from given ``filename``. |
| |
| Warning: |
| `map_location` and `callback` parameters are not supported yet. |
| |
| Args: |
| filename (str): Accept local filepath. |
| |
| Keyword Args: |
| resume_optimizer (bool): Whether to resume optimizer state. |
| Defaults to True. |
| resume_param_scheduler (bool): Whether to resume param scheduler |
| state. Defaults to True. |
| """ |
| self.logger.info(f'Resume checkpoint from {filename}') |
|
|
| dirname, basename = osp.split(filename) |
| if digit_version(deepspeed.__version__) >= digit_version('0.13.2'): |
| _, extra_ckpt = self.model.load_checkpoint( |
| dirname, |
| tag=basename, |
| load_optimizer_states=resume_optimizer, |
| load_module_strict=not self.exclude_frozen_parameters) |
| else: |
| _, extra_ckpt = self.model.load_checkpoint( |
| dirname, tag=basename, load_optimizer_states=resume_optimizer) |
|
|
| if resume_optimizer: |
| self.load_optim_state_dict(extra_ckpt.pop('optim_wrapper')) |
|
|
| if resume_param_scheduler and hasattr(self, 'param_schedulers'): |
| param_schedulers = extra_ckpt.pop('param_schedulers') |
| self.load_scheduler_state_dict(param_schedulers) |
|
|
| |
| resumed_seed = extra_ckpt['meta'].get('seed', None) |
| current_seed = self._randomness.get('seed') |
| if resumed_seed is not None and resumed_seed != current_seed: |
| if current_seed is not None: |
| self.logger.warning(f'The value of random seed in the ' |
| f'checkpoint "{resumed_seed}" is ' |
| f'different from the value in ' |
| f'`randomness` config "{current_seed}"') |
| self._randomness.update(seed=resumed_seed) |
| self._set_randomness(**self._randomness) |
|
|
| return extra_ckpt |
|
|
| def save_checkpoint( |
| self, |
| filename: str, |
| *, |
| save_optimizer: bool = True, |
| save_param_scheduler: bool = True, |
| extra_ckpt: Optional[dict] = None, |
| callback: Optional[Callable] = None, |
| ) -> None: |
| """Save checkpoint to given ``filename``. |
| |
| Warning: |
| `callback` parameter is not supported yet. |
| |
| Args: |
| filename (str): Filename to save checkpoint. |
| |
| Keyword Args: |
| save_param_scheduler (bool): Whether to save the param_scheduler |
| to the checkpoint. Defaults to True. |
| extra_ckpt (dict, optional): Extra checkpoint to save. |
| Defaults to None. |
| """ |
| if extra_ckpt is None: |
| extra_ckpt = dict() |
| if 'meta' not in extra_ckpt: |
| extra_ckpt['meta'] = dict() |
| extra_ckpt['meta'].update( |
| seed=self.seed, |
| time=time.strftime('%Y%m%d_%H%M%S', time.localtime()), |
| mmengine=mmengine.__version__ + get_git_hash(), |
| ) |
|
|
| if save_param_scheduler and hasattr(self, 'param_schedulers'): |
| extra_ckpt['param_schedulers'] = self.scheduler_state_dict() |
|
|
| if (not save_optimizer |
| and self.model.zero_optimization_partition_weights() |
| and not self.model.zero_gather_16bit_weights_on_model_save()): |
| print_log( |
| 'Configured to `save_optimizer=False`, but currently using ' |
| "DeepSpeed's ZeRO stage 3 with " |
| '`gather_16bit_weights_on_model_save=False`. In ' |
| 'this configuration, the model cannot be saved properly ' |
| 'and will be saved with the optimizer state. ' |
| 'To support `save_optimizer=False`, please set ' |
| '`gather_16bit_weights_on_model_save=True` in your ' |
| 'DeepSpeed config.', |
| logger='current', |
| level=logging.WARNING) |
| save_optimizer = True |
|
|
| state_dict_kwargs = {} |
| if digit_version(deepspeed.__version__) >= digit_version('0.13.2'): |
| state_dict_kwargs[ |
| 'exclude_frozen_parameters'] = self.exclude_frozen_parameters |
|
|
| if save_optimizer: |
| if hasattr(self, 'optim_wrapper'): |
| |
| |
| extra_ckpt['optim_wrapper'] = self.optim_state_dict() |
|
|
| dirname, basename = osp.split(filename) |
| self.model.save_checkpoint( |
| dirname, |
| tag=basename, |
| client_state=extra_ckpt, |
| save_latest=False, |
| **state_dict_kwargs) |
| else: |
| if self.model.zero_optimization_partition_weights(): |
| state_dict = self.model._zero3_consolidated_16bit_state_dict( |
| **state_dict_kwargs) |
| else: |
| state_dict = self.model.module_state_dict(**state_dict_kwargs) |
|
|
| if is_main_process(): |
| ckpt = {'state_dict': weights_to_cpu(state_dict), **extra_ckpt} |
| save_checkpoint(ckpt, filename) |
|
|