[TOC]
Pytorch Lightning的两大API之一,是torch.nn.Module的高级封装。
同torch.nn.Module中的__init__,用于构建模型。
同torch.nn.Module中的forward,通过__init__中的各个模块实现前向传播。
训练一批数据并反向传播。参数如下:
- batch (Tensor | (Tensor, …) | [Tensor, …]) – 数据输入,一般为x, y = batch。
- batch_idx (int) – 批次索引。
- optimizer_idx (int) – 当使用多个优化器时,会使用本参数。
- hiddens (Tensor) – 当truncated_bptt_steps > 0时使用。
举个例子:
def training_step(self, batch, batch_idx): # 数据类型自动转换,模型自动调用.train()
x, y = batch
_y = self(x)
loss = criterion(_y, y) # 计算loss
return loss # 返回loss,更新网络
def training_step(self, batch, batch_idx, hiddens):
# hiddens是上一次截断反向传播的隐藏状态
out, hiddens = self.lstm(data, hiddens)
return {"loss": loss, "hiddens": hiddens}
一批数据训练结束时的操作。一般用不着,分布式训练的时候会用上。参数如下:
- batch_parts_outputs – 当前批次的training_step()的返回值
举个例子:
def training_step(self, batch, batch_idx):
x, y = batch
_y = self(x)
return {"output": _y, "target": y}
def training_step_end(self, training_step_outputs): # 多GPU分布式训练,计算loss
gpu_0_output = training_step_outputs[0]["output"]
gpu_1_output = training_step_outputs[1]["output"]
gpu_0_target = training_step_outputs[0]["target"]
gpu_1_target = training_step_outputs[1]["target"]
# 对所有GPU的数据进行处理
loss = criterion([gpu_0_output, gpu_1_output], [gpu_0_target, gpu_1_target])
return loss
一轮数据训练结束时的操作。主要针对于本轮所有training_step的输出。参数如下:
- outputs (List[Any]) – training_step()的输出。
举个例子:
def training_epoch_end(self, outs): # 计算本轮的loss和acc
loss = 0.
for out in outs: # outs按照训练顺序排序
loss += out["loss"].cpu().detach().item()
loss /= len(outs)
acc = self.train_metric.compute()
self.history["loss"].append(loss)
self.history["acc"].append(acc)
见training_step。
见training_step_end。
见training_epoch_end。
见training_step。
见training_step_end。
见training_epoch_end。
在优化过程中选择优化器和学习率调度器,通常只需要一个,但对于GAN之类的可能需要多个。
举一堆例子:
-
单个优化器
def configure_optimizers(self): return Adam(self.parameters(), lr=1e-3)
-
多个优化器(比如GAN)
def configure_optimizers(self): generator_opt = Adam(self.model_gen.parameters(), lr=0.01) disriminator_opt = Adam(self.model_disc.parameters(), lr=0.02) return generator_opt, disriminator_opt
可以修改frequency键来控制优化频率:
def configure_optimizers(self): gen_opt = Adam(self.model_gen.parameters(), lr=0.01) dis_opt = Adam(self.model_disc.parameters(), lr=0.02) n_critic = 5 return ( {"optimizer": dis_opt, "frequency": n_critic}, {"optimizer": gen_opt, "frequency": 1} )
-
多个优化器和多个调度器或学习率字典(比如GAN)
def configure_optimizers(self): generator_opt = Adam(self.model_gen.parameters(), lr=0.01) disriminator_opt = Adam(self.model_disc.parameters(), lr=0.02) discriminator_sched = CosineAnnealing(discriminator_opt, T_max=10) return [generator_opt, disriminator_opt], [discriminator_sched]
def configure_optimizers(self): generator_opt = Adam(self.model_gen.parameters(), lr=0.01) disriminator_opt = Adam(self.model_disc.parameters(), lr=0.02) discriminator_sched = CosineAnnealing(discriminator_opt, T_max=10) return {"optimizer": [generator_opt, disriminator_opt], "lr_scheduler": [discriminator_sched]}
对于调度器,可以修改其属性:
{ "scheduler": lr_scheduler, # 调度器 "interval": "epoch", # 调度的单位,epoch或step "frequency": 1, # 调度的频率,多少轮一次 "reduce_on_plateau": False, # ReduceLROnPlateau "monitor": "val_loss", # ReduceLROnPlateau的监控指标 "strict": True # 如果没有monitor,是否中断训练 }
def configure_optimizers(self): gen_opt = Adam(self.model_gen.parameters(), lr=0.01) dis_opt = Adam(self.model_disc.parameters(), lr=0.02) gen_sched = {"scheduler": ExponentialLR(gen_opt, 0.99), "interval": "step"} dis_sched = CosineAnnealing(discriminator_opt, T_max=10) return [gen_opt, dis_opt], [gen_sched, dis_sched]
一些注意事项:
-
Lightning在需要的时候会调用backward和step。
-
如果使用半精度(precision=16),Lightning会自动处理。
-
如果使用多个优化器,training_step会附加一个参数optimizer_idx。
-
如果使用LBFGS,Lightning将自动为您处理关闭功能。
-
如果使用多个优化器,则在每个训练步骤中仅针对当前优化器的参数计算梯度。
-
如果您需要控制这些优化程序执行或改写默认step的频率,请改写optimizer_step。
-
如果在每n步都调用调度器,或者只想监视自定义指标,则可以在lr_dict中指定它们。
{ "scheduler": lr_scheduler, "interval": "step", # or "epoch" "monitor": "val_f1", "frequency": n, }
冻结所有参数/解冻所有参数。
保存__init__中传入的超参数。
举个例子:
def __init__(self, arg1, arg2, arg3): # 1, "abc", 3.14
super().__init__()
# self.save_hyperparameters() # 保存所有超参数
# self.save_hyperparameters("arg1", "arg2", "arg3") # 同上,保存所有超参数
self.save_hyperparameters("arg1", "arg3") # 保存部分超参数
def __init__(self, params): # params=Namespace(p1=1, p2="abc", p3=3.14)
super().__init__()
self.save_hyperparameters(params) # 保存所有超参数
保存模型为ONNX格式。参数如下:
- file_path (str) – 保存路径。
- input_sample (Optional[Tensor]) – 用于跟踪的输入张量的样本。
- **kwargs – 将传递给torch.onnx.export函数。
举个例子:
with tempfile.NamedTemporaryFile(suffix=".onnx", delete=False) as tmpfile:
model = SimpleModel()
input_sample = torch.randn((1, 64))
model.to_onnx(tmpfile.name, input_sample, export_params=True)
os.path.isfile(tmpfile.name)
log(name, value, prog_bar=False, logger=True, on_step=None, on_epoch=None, reduce_fx=torch.mean, tbptt_reduce_fx=torch.mean, tbptt_pad_token=0, enable_graph=False, sync_dist=False, sync_dist_op="mean", sync_dist_group=None)
log_dict(dictionary, prog_bar=False, logger=True, on_step=None, on_epoch=None, reduce_fx=torch.mean, tbptt_reduce_fx=torch.mean, tbptt_pad_token=0, enable_graph=False, sync_dist=False, sync_dist_op="mean", sync_dist_group=None)
有人问,log不重要吗?当然重要,但不是用Pytorch Lightning的log,因为实在是太反人类了,各种bug,后续我们会用LightningModule和Callback实现我们自己的logger。
当前轮数。
当前设备。
全局排名是指该GPU在所有GPU中的索引。如果使用10台计算机,每台计算机具有4个GPU,则第10台计算机上的第4个GPU的global_rank = 39。
Lightning仅从开始global_rank = 0保存日志,权重等。通常不需要使用此属性。
当前步数,每轮不重置。
save_hyperparameters所保存的超参数。
当前日志。
本地排名是指该计算机上的索引。如果使用10台计算机,则每台计算机上索引为0的GPU的local_rank = 0。
Lightning仅从开始global_rank = 0保存日志,权重等。通常不需要使用此属性。
所使用的进度类型。
指向trainer。
是否使用了自动混合精度/ddp/ddp2/dp/tpu。
下面的伪代码描述了训练过程:
def fit(...):
on_fit_start()
if global_rank == 0:
# prepare data is called on GLOBAL_ZERO only
prepare_data()
for gpu/tpu in gpu/tpus:
train_on_device(model.copy())
on_fit_end()
def train_on_device(model):
# setup is called PER DEVICE
setup()
configure_optimizers()
on_pretrain_routine_start()
for epoch in epochs:
train_loop()
teardown()
def train_loop():
on_train_epoch_start()
train_outs = []
for train_batch in train_dataloader():
on_train_batch_start()
# ----- train_step methods -------
out = training_step(batch)
train_outs.append(out)
loss = out.loss
backward()
on_after_backward()
optimizer_step()
on_before_zero_grad()
optimizer_zero_grad()
on_train_batch_end(out)
if should_check_val:
val_loop()
# end training epoch
logs = training_epoch_end(outs)
def val_loop():
model.eval()
torch.set_grad_enabled(False)
on_validation_epoch_start()
val_outs = []
for val_batch in val_dataloader():
on_validation_batch_start()
# -------- val step methods -------
out = validation_step(val_batch)
val_outs.append(out)
on_validation_batch_end(out)
validation_epoch_end(val_outs)
on_validation_epoch_end()
# set up for train
model.train()
torch.set_grad_enabled(True)
反向传播。参数如下:
- loss (Tensor) – 已经被积累的梯度所放缩的损失。
- optimizer (Optimizer) – 当前被使用的优化器。
- optimizer_idx (int) – 当前被使用的优化器的索引。
举个例子:
def backward(self, loss, optimizer, optimizer_idx):
loss.backward()
修改进度条的内容。
举个例子:
# Epoch 1: 4%|▎ | 40/1095 [00:03<01:37, 10.84it/s, loss=4.501, v_num=10]
def get_progress_bar_dict(self):
# 不显示v_num
items = super().get_progress_bar_dict()
items.pop("v_num", None)
return items
手动反向传播。无法覆盖trainer的设置。
举个例子:
def training_step(...):
(opt_a, opt_b) = self.optimizers()
loss = ...
self.manual_backward(loss, opt_a)
self.manual_optimizer_step(opt_a)
手动优化。无法覆盖trainer的设置。参数如下:
- optimizer (Optimizer) – 用于step()的优化器。
- force_optimizer_step (bool) – 是否强制执行优化程序步骤。当有2个优化器且其中一个应使用累积的渐变而不是另一个使用渐变时,这可能会很有用。可以采用自己的逻辑来强制执行优化程序步骤。
举个例子:
def training_step(...):
(opt_a, opt_b) = self.optimizers()
loss = ...
self.manual_backward(loss, opt_a)
self.manual_optimizer_step(opt_a, force_optimizer_step=True)
在loss.backward()之后且优化程序执行任何操作之前在训练循环中调用。这是检查或记录梯度信息的理想位置。
举个例子:
def on_after_backward(self):
if self.trainer.global_step % 25 == 0:
params = self.state_dict()
for k, v in params.items():
grads = v
name = k
self.logger.experiment.add_histogram(tag=name, values=grads, global_step=self.trainer.global_step)
在optimizer.step()之后和optimizer.zero_grad()之前调用。检查权重信息并更新权重的理想位置。
举个例子:
for optimizer in optimizers:
optimizer.step()
model.on_before_zero_grad(optimizer) # < ---- 调用
optimizer.zero_grad()
在训练开始/结束时调用。如果在DDP上,则在每个进程上调用。
使模型有机会在state_dict存在之前/后加载某些内容。
- fit
- pretrain_routine start
- pretrain_routine end
- training_start
on_train_batch_start(batch, batch_idx, dataloader_idx)/on_train_batch_end(outputs, batch, batch_idx, dataloader_idx)
对于每个训练批次,在开始/结束时调用。
对于每轮训练,在开始/结束时调用。
on_validation_batch_start(batch, batch_idx, dataloader_idx)/on_validation_batch_end(outputs, batch, batch_idx, dataloader_idx)
同上。
同上。
on_test_batch_start(batch, batch_idx, dataloader_idx)/on_test_batch_end(outputs, batch, batch_idx, dataloader_idx)
同上。
同上。
optimizer_step(epoch, batch_idx, optimizer, optimizer_idx, optimizer_closure, on_tpu, using_native_amp, using_lbfgs)
重写此方法可以调整Trainer调用每个优化器的默认方式 。默认情况下,每个优化程序都会一次调用Lightning step(),zero_grad()。参数如下:
- epoch (int) – 当前轮数。
- batch_idx (int) – 当前批次的索引。
- optimizer (Optimizer) –优化器。
- optimizer_idx (int) – 如果有多个优化器,则使用。
- optimizer_closure (Optional[Callable]) – 所有优化器的关闭。
- on_tpu (bool) – 是否为TPU。
- using_native_amp (bool) – 是否为自动混合精度。
- using_lbfgs (bool) – 匹配的优化器是否为lbfgs。
举个例子:
# 学习率预热
def optimizer_step(self, epoch, batch_idx, optimizer, optimizer_idx, optimizer_closure, on_tpu, using_native_amp, using_lbfgs):
# 预热
if self.trainer.global_step < 500:
lr_scale = min(1., float(self.trainer.global_step + 1) / 500.)
for pg in optimizer.param_groups:
pg['lr'] = lr_scale * self.learning_rate
# 更新参数
optimizer.step(closure=optimizer_closure)
optimizer.zero_grad()
用于下载和准备数据。
在训练和测试开始时调用。当您需要动态构建模型或对模型进行调整时,这是一个很好的选择。使用DDP时,每个进程都会调用此钩子。参数如下:
- stage (str) – ‘fit’或‘test’。
举个例子:
class LitModel(...):
def __init__(self):
self.l1 = None
def prepare_data(self):
download_data()
tokenize()
self.something = else
def setup(stage):
data = Load_data(...)
self.l1 = nn.Linear(28, data.num_classes)
使用经过时间的截断的反向传播时,必须沿时间维度拆分每个批次。默认情况下,Lightning会处理此问题,但对于自定义行为,请覆盖此功能。参数如下:
- batch (Tensor) – 当前批次。
- split_size (int) – 分割的大小。
举个例子:
def tbptt_split_batch(self, batch, split_size):
splits = []
for t in range(0, time_dims[0], split_size):
batch_split = []
for i, x in enumerate(batch):
if isinstance(x, torch.Tensor):
split_x = x[:, t:t + split_size]
elif isinstance(x, collections.Sequence):
split_x = [None] * len(x)
for batch_idx in range(len(x)):
split_x[batch_idx] = x[batch_idx][t:t + split_size]
batch_split.append(split_x)
splits.append(batch_split)
return splits
在训练和测试结束时调用。参数如下:
- stage (str) – ‘fit’或‘test’。
- fit()
- …
- prepare_data()
- setup()
- train_dataloader()
- val_dataloader()
- test_dataloader()
- 需要掌握的方法:
- __init__/forword
- training_step/training_step_end/training_epoch_end
- validation_step/validation_step_end/validation_epoch_end
- configure_optimizer
- freeze/unfreeze
- save_hyperparameters
- 需要掌握的属性:
- current_epoch
- device
- hparams
- precision
- trainer
其他的需要时再看即可。