pytorch DistributedDataParallel 多卡訓練結果變差的解決方案

DDP 數據shuffle 的設置

使用DDP要給dataloader傳入sampler參數(torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)) 。 默認shuffle=True,但按照pytorch DistributedSampler的實現:

    def __iter__(self) -> Iterator[T_co]:
        if self.shuffle:
            # deterministically shuffle based on epoch and seed
            g = torch.Generator()
            g.manual_seed(self.seed + self.epoch)
            indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore
        else:
            indices = list(range(len(self.dataset)))  # type: ignore

產生隨機indix的種子是和當前的epoch有關,所以需要在訓練的時候手動set epoch的值來實現真正的shuffle:

for epoch in range(start_epoch, n_epochs):
    if is_distributed:
        sampler.set_epoch(epoch)
    train(loader)

DDP 增大batchsize 效果變差的問題

large batchsize:

理論上的優點:

數據中的噪聲影響可能會變小,可能容易接近最優點;

缺點和問題:

降低瞭梯度的variance;(理論上,對於凸優化問題,低的梯度variance可以得到更好的優化效果; 但是實際上Keskar et al驗證瞭增大batchsize會導致差的泛化能力);

對於非凸優化問題,損失函數包含多個局部最優點,小的batchsize有噪聲的幹擾可能容易跳出局部最優點,而大的batchsize有可能停在局部最優點跳不出來。

解決方法:

增大learning_rate,但是可能出現問題,在訓練開始就用很大的learning_rate 可能導致模型不收斂 (https://arxiv.org/abs/1609.04836)

使用warming up (https://arxiv.org/abs/1706.02677)

warmup

在訓練初期就用很大的learning_rate可能會導致訓練不收斂的問題,warmup的思想是在訓練初期用小的學習率,隨著訓練慢慢變大學習率,直到base learning_rate,再使用其他decay(CosineAnnealingLR)的方式訓練.

# copy from https://github.com/ildoonet/pytorch-gradual-warmup-lr/blob/master/warmup_scheduler/scheduler.py
from torch.optim.lr_scheduler import _LRScheduler
from torch.optim.lr_scheduler import ReduceLROnPlateau
class GradualWarmupScheduler(_LRScheduler):
    """ Gradually warm-up(increasing) learning rate in optimizer.
    Proposed in 'Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour'.
    Args:
        optimizer (Optimizer): Wrapped optimizer.
        multiplier: target learning rate = base lr * multiplier if multiplier > 1.0. if multiplier = 1.0, lr starts from 0 and ends up with the base_lr.
        total_epoch: target learning rate is reached at total_epoch, gradually
        after_scheduler: after target_epoch, use this scheduler(eg. ReduceLROnPlateau)
    """
    def __init__(self, optimizer, multiplier, total_epoch, after_scheduler=None):
        self.multiplier = multiplier
        if self.multiplier < 1.:
            raise ValueError('multiplier should be greater thant or equal to 1.')
        self.total_epoch = total_epoch
        self.after_scheduler = after_scheduler
        self.finished = False
        super(GradualWarmupScheduler, self).__init__(optimizer)
    def get_lr(self):
        if self.last_epoch > self.total_epoch:
            if self.after_scheduler:
                if not self.finished:
                    self.after_scheduler.base_lrs = [base_lr * self.multiplier for base_lr in self.base_lrs]
                    self.finished = True
                return self.after_scheduler.get_last_lr()
            return [base_lr * self.multiplier for base_lr in self.base_lrs]
        if self.multiplier == 1.0:
            return [base_lr * (float(self.last_epoch) / self.total_epoch) for base_lr in self.base_lrs]
        else:
            return [base_lr * ((self.multiplier - 1.) * self.last_epoch / self.total_epoch + 1.) for base_lr in self.base_lrs]
    def step_ReduceLROnPlateau(self, metrics, epoch=None):
        if epoch is None:
            epoch = self.last_epoch + 1
        self.last_epoch = epoch if epoch != 0 else 1  # ReduceLROnPlateau is called at the end of epoch, whereas others are called at beginning
        if self.last_epoch <= self.total_epoch:
            warmup_lr = [base_lr * ((self.multiplier - 1.) * self.last_epoch / self.total_epoch + 1.) for base_lr in self.base_lrs]
            for param_group, lr in zip(self.optimizer.param_groups, warmup_lr):
                param_group['lr'] = lr
        else:
            if epoch is None:
                self.after_scheduler.step(metrics, None)
            else:
                self.after_scheduler.step(metrics, epoch - self.total_epoch)
    def step(self, epoch=None, metrics=None):
        if type(self.after_scheduler) != ReduceLROnPlateau:
            if self.finished and self.after_scheduler:
                if epoch is None:
                    self.after_scheduler.step(None)
                else:
                    self.after_scheduler.step(epoch - self.total_epoch)
                self._last_lr = self.after_scheduler.get_last_lr()
            else:
                return super(GradualWarmupScheduler, self).step(epoch)
        else:
            self.step_ReduceLROnPlateau(metrics, epoch)

分佈式多卡訓練DistributedDataParallel踩坑

近幾天想研究瞭多卡訓練,就花瞭點時間,本以為會很輕松,可是好多坑,一步一步踏過來,一般分佈式訓練分為單機多卡與多機多卡兩種類型;

主要有兩種方式實現:

1、DataParallel: Parameter Server模式,一張卡位reducer,實現也超級簡單,一行代碼

DataParallel是基於Parameter server的算法,負載不均衡的問題比較嚴重,有時在模型較大的時候(比如bert-large),reducer的那張卡會多出3-4g的顯存占用

2、DistributedDataParallel:官方建議用新的DDP,采用all-reduce算法,本來設計主要是為瞭多機多卡使用,但是單機上也能用

為什麼要分佈式訓練?

可以用多張卡,總體跑得更快

可以得到更大的 BatchSize

有些分佈式會取得更好的效果

主要分為以下幾個部分:

單機多卡,DataParallel(最常用,最簡單)

單機多卡,DistributedDataParallel(較高級)、多機多卡,DistributedDataParallel(最高級)

如何啟動訓練

模型保存與讀取

註意事項

一、單機多卡(DATAPARALLEL)

from torch.nn import DataParallel
 
device = torch.device("cuda")
#或者device = torch.device("cuda:0" if True else "cpu")
 
model = MyModel()
model = model.to(device)
model = DataParallel(model)
#或者model = nn.DataParallel(model,device_ids=[0,1,2,3])

比較簡單,隻需要加一行代碼就行, model = DataParallel(model)

二、多機多卡、單機多卡(DISTRIBUTEDDATAPARALLEL)

建議先把註意事項看完在修改代碼,防止出現莫名的bug,修改訓練代碼如下:

其中opt.local_rank要在代碼前面解析這個參數,可以去後面看我寫的註意事項;

    from torch.utils.data.distributed import DistributedSampler
    import torch.distributed as dist
    import torch
 
    # Initialize Process Group
    dist_backend = 'nccl'
    print('args.local_rank: ', opt.local_rank)
    torch.cuda.set_device(opt.local_rank)
    dist.init_process_group(backend=dist_backend)
 
    model = yourModel()#自己的模型
    if torch.cuda.device_count() > 1:
        print("Let's use", torch.cuda.device_count(), "GPUs!")
        # 5) 封裝
        # model = torch.nn.parallel.DistributedDataParallel(model,
        #                                                   device_ids=[opt.local_rank],
        #                                                   output_device=opt.local_rank)
        model = torch.nn.parallel.DistributedDataParallel(model.cuda(), device_ids=[opt.local_rank])
    device = torch.device(opt.local_rank)
    model.to(device)
    dataset = ListDataset(train_path, augment=True, multiscale=opt.multiscale_training, img_size=opt.img_size, normalized_labels=True)#自己的讀取數據的代碼
    world_size = torch.cuda.device_count()
    datasampler = DistributedSampler(dataset, num_replicas=dist.get_world_size(), rank=opt.local_rank)
 
    dataloader = torch.utils.data.DataLoader(
        dataset,
        batch_size=opt.batch_size,
        shuffle=False,
        num_workers=opt.n_cpu,
        pin_memory=True,
        collate_fn=dataset.collate_fn,
        sampler=datasampler
    )#在原始讀取數據中加sampler參數就行
 
 
.....
 
訓練過程中,數據轉cuda
      imgs = imgs.to(device)
      targets = targets.to(device)

三、如何啟動訓練

1、DataParallel方式

正常訓練即可,即

python3 train.py

2、DistributedDataParallel方式

需要通過torch.distributed.launch來啟動,一般是單節點,

CUDA_VISIBLE_DEVICES=0,1 python3 -m torch.distributed.launch --nproc_per_node=2 train.py

其中CUDA_VISIBLE_DEVICES 設置用的顯卡編號,–nproc_pre_node 每個節點的顯卡數量,一般有幾個顯卡就用幾個顯卡

多節點

python3 -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE --nnodes=2 --node_rank=0
#兩個節點,在0號節點

要是訓練成功,就會打印出幾個信息,有幾個卡就打印幾個信息,如下圖所示:

四、模型保存與讀取

以下a、b是對應的,用a保存,就用a方法加載

1、保存

a、隻保存參數

torch.save(model.module.state_dict(), path)

b、保存參數與網絡

torch.save(model.module,path)

2、加載

a、多卡加載模型預訓練;

model = Yourmodel()
if opt.pretrained_weights:
        if opt.pretrained_weights.endswith(".pth"):
            model.load_state_dict(torch.load(opt.pretrained_weights))
        else:
            model.load_darknet_weights(opt.pretrained_weights)

單卡加載模型,需要加載模型時指定主卡讀模型,而且這個’cuda:0′,是看你訓練的模型是0還是1(否則就會出錯RuntimeError: Attempting to deserialize object on CUDA device 1 but torch.cuda.device_count() is 1. Please use torch.load with map_location to map your storages to an existing device),可以根據自己的更改:

model = Yourmodel()
if opt.pretrained_weights:
        if opt.pretrained_weights.endswith(".pth"):
            model.load_state_dict(torch.load(opt.pretrained_weights,map_location="cuda:0"))
        else:
            model.load_darknet_weights(opt.pretrained_weights)

b、單卡加載模型;

同樣也要指定讀取模型的卡。  

model = torch.load(opt.weights_path, map_location="cuda:0")

多卡加載預訓練模型,以b這種方式還沒跑通。

五、註意事項

1、model後面添加module

獲取到網絡模型後,使用並行方法,並將網絡模型和參數移到GPU上。註意,若需要修改網絡模塊或者獲得模型的某個參數,一定要在model後面加上.module,否則會報錯,比如:

model.img_size  要改成  model.module.img_size

2、.cuda或者.to(device)等問題

device是自己設置,如果.cuda出錯,就要化成相應的device

model(如:model.to(device))

input(通常需要使用Variable包裝,如:input = Variable(input).to(device))

target(通常需要使用Variable包裝

nn.CrossEntropyLoss()(如:criterion = nn.CrossEntropyLoss().to(device))

3、args.local_rank的參數

通過torch.distributed.launch來啟動訓練,torch.distributed.launch 會給模型分配一個args.local_rank的參數,所以在訓練代碼中要解析這個參數,也可以通過torch.distributed.get_rank()獲取進程id。

parser.add_argument("--local_rank", type=int, default=-1, help="number of cpu threads to use during batch generation")
 

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: