OnPolicyRunner 是 RSL-RL 库中最核心的训练编排器,负责将环境交互、数据存储、策略更新与日志记录串联为一个完整的 on-policy 训练闭环。本文深入解析其内部执行流程,帮助读者理解从环境采集到策略梯度更新的每一个关键步骤,为调参、断点续训及自定义扩展奠定工程基础。
架构总览与职责边界
OnPolicyRunner 并不直接实现 PPO 数学逻辑,而是通过职责分离的设计模式,将算法细节委托给 PPO 类,将数据存储委托给 RolloutStorage,将环境交互委托给 VecEnv 包装器,并将指标记录委托给 Logger。这种分层架构使得 Runner 专注于编排时序与生命周期管理,而各个子模块可独立演进。
classDiagram
direction TB
class OnPolicyRunner {
+VecEnv env
+PPO alg
+Logger logger
+int current_learning_iteration
+learn(num_iterations)
+save(path)
+load(path)
+train_mode()
+eval_mode()
}
class PPO {
+ActorCritic policy
+RolloutStorage storage
+Optimizer optimizer
+act(obs)
+process_env_step(obs, rewards, dones, extras)
+compute_returns(last_obs)
+update()
}
class RolloutStorage {
+TensorDict observations
+Tensor rewards/actions/dones
+Tensor returns/advantages
+add_transition(transition)
+mini_batch_generator(num_mini_batches, num_epochs)
+recurrent_mini_batch_generator(...)
}
class VecEnv {
+step(actions)
+get_observations()
+int num_envs
}
class Logger {
+process_env_step(rewards, dones, extras)
+log(it, loss_dict, ...)
}
OnPolicyRunner --> PPO : 调用算法接口
OnPolicyRunner --> VecEnv : 执行并行环境步
OnPolicyRunner --> Logger : 输出训练指标
PPO --> RolloutStorage : 填充转移数据 / 生成训练批次
Sources: on_policy_runner.py, ppo.py, rollout_storage.py
训练循环执行流程
OnPolicyRunner.learn() 方法是整个训练流程的入口。它以学习迭代(learning iteration)为外层循环,每个迭代内部严格遵循「采集 → 计算回报 → 策略更新 → 日志记录」四阶段。下方流程图展示了单次迭代内的精确时序:
flowchart TD
Start([runner.learn 开始]) --> Init[随机化初始 episode 长度<br/>init_at_random_ep_len]
Init --> GetObs[获取初始观测<br/>obs = env.get_observations]
GetObs --> TrainMode[切换训练模式<br/>train_mode]
TrainMode --> DistCheck{分布式训练?}
DistCheck -->|是| SyncParam[广播参数<br/>broadcast_parameters]
DistCheck -->|否| OuterLoop[外层循环<br/>for it in iterations]
SyncParam --> OuterLoop
OuterLoop --> InferMode[进入 Rollout 阶段<br/>torch.inference_mode]
InferMode --> Act[alg.act 采样动作]
Act --> Step[env.step 执行并行环境步]
Step --> Proc[alg.process_env_step<br/>存储转移 & 处理超时]
Proc --> LogStep[logger.process_env_step<br/>累计 episode 奖励]
LogStep --> CheckRollout{已完成<br/>num_steps_per_env?}
CheckRollout -->|否| Act
CheckRollout -->|是| ComputeRet[alg.compute_returns<br/>GAE 计算优势与回报]
ComputeRet --> Update[alg.update<br/>PPO 多 epoch 小批量更新]
Update --> LogIter[logger.log<br/>输出指标到控制台与 TensorBoard]
LogIter --> SaveCheck{it % save_interval == 0?}
SaveCheck -->|是| SaveModel[save 保存 checkpoint]
SaveCheck -->|否| NextIter{还有剩余迭代?}
SaveModel --> NextIter
NextIter -->|是| OuterLoop
NextIter -->|否| FinalSave[保存最终模型]
FinalSave --> End([结束])
Sources: on_policy_runner.py
阶段一:环境交互与数据采集(Rollout)
在每个学习迭代开始时,Runner 会在 torch.inference_mode() 上下文中执行 num_steps_per_env 步环境交互。这里的 num_steps_per_env 指每个并行环境采集的步数,因此单次 Rollout 的总样本量为 num_envs × num_steps_per_env。具体每一步的执行链路如下:
首先,PPO 的 act 方法接收当前观测 obs,由策略网络输出动作分布的均值 action_mean 与标准差 action_sigma,并从中采样得到动作。同时,critic 网络会评估当前状态价值 values,并将动作对数概率 actions_log_prob 一并记录到 Transition 临时对象中。随后,Runner 调用 env.step(actions) 推进所有并行环境,获得新的观测、外在奖励 rewards、终止标志 dones 以及额外信息 extras。
在 process_env_step 阶段,算法会完成四项关键工作:其一,更新策略网络的观测归一化统计量(如果启用);其二,若配置了 RND 好奇心探索,则计算内在奖励并叠加到外在奖励上;其三,处理 time_outs 超时引导——当环境因达到最大步长而终止时,按照 gamma * V(s_{t+1}) 对奖励进行 bootstrap,避免价值估计偏差;其四,将完整的转移数据写入 RolloutStorage。同时,logger.process_env_step 会累加各环境的 episode 奖励,并在 dones > 0 时将完成的 episode 统计推入滑动窗口。
Sources: ppo.py, on_policy_runner.py
阶段二:回报计算与优势估计(GAE)
Rollout 结束后,compute_returns 方法利用最后一步的观测 obs 计算 bootstrap 价值 last_values,随后从后向前遍历整个轨迹缓冲区,采用 Generalized Advantage Estimation (GAE) 计算优势函数与折扣回报。核心递推公式为:
- TD 残差:
delta_t = r_t + gamma * V(s_{t+1}) * (1 - done_t) - V(s_t) - 优势估计:
A_t = delta_t + gamma * lambda * (1 - done_t) * A_{t+1} - 回报估计:
R_t = A_t + V(s_t)
若配置中未启用 normalize_advantage_per_mini_batch,则优势会在全量数据层面进行标准化,即减去均值并除以标准差(加 1e-8 防止除零)。这种全局归一化策略计算开销较小,而逐 mini-batch 归一化则能在非平稳分布下提供更稳定的训练信号,代价是略微增加计算量。
Sources: ppo.py
阶段三:策略更新(Update)
update 方法是 PPO 算法的核心。它首先根据策略类型选择批次生成器:对于前馈网络使用 mini_batch_generator,对于循环网络(RNN/LSTM/GRU)则使用 recurrent_mini_batch_generator。生成器会将 num_envs × num_steps_per_env 条转移记录展平后,按 num_mini_batches 划分,并在 num_learning_epochs 个 epoch 内随机打乱索引、反复采样。
对每个小批量,算法重新计算当前策略下的动作对数概率 actions_log_prob、状态价值 value 与策略熵 entropy,并基于旧策略的参数计算 surrogate loss:
- 重要性采样比:
ratio = exp(log_prob_new - log_prob_old) - 裁剪目标:
surrogate = -advantage * ratio - 裁剪后目标:
surrogate_clipped = -advantage * clamp(ratio, 1-ε, 1+ε) - 最终策略损失:
max(surrogate, surrogate_clipped).mean()
价值函数损失支持两种模式:当 use_clipped_value_loss 为真时,对价值预测的变化幅度进行裁剪,取裁剪前后平方误差的较大者,以抑制 critic 的剧烈更新;否则使用简单的均方误差。总损失由策略损失、价值损失(乘以 value_loss_coef)与熵奖励(乘以 entropy_coef)加权构成。
Sources: ppo.py
阶段四:扩展损失与梯度应用
PPO 的基础损失之外,update 方法还支持三项扩展机制。对称性增强(Symmetry)可在数据层面进行镜像增强,或在损失层面增加 mirror loss,利用机器人左右对称先验约束策略输出。辅助损失(Auxiliary Loss)则通过 policy.get_aux_loss() 为网络引入额外监督信号。若启用 RND,rnd_loss 会单独通过 rnd_optimizer 进行反向传播,优化预测网络对固定目标网络的拟合误差。
梯度计算完成后,PPO 参数与可选的 RND 参数会分别经过 clip_grad_norm_ 裁剪,随后执行优化器步进。在分布式多卡场景下,所有 GPU 的梯度会通过 all_reduce 求平均,确保各进程模型参数保持一致。此外,若启用 adaptive 学习率调度,算法会在每轮 mini-batch 后估算新旧策略间的 KL 散度,并根据其与 desired_kl 的偏离程度动态调整学习率:当 KL 过大时降低学习率,过小时则提升。
Sources: ppo.py
存储与批次生成机制
RolloutStorage 是连接环境交互与策略更新的数据枢纽。它以 TensorDict 形式存储观测,并为不同训练类型(RL / distillation)分配差异化字段。对于 RL 训练,缓冲区预分配了 observations、rewards、actions、dones、values、actions_log_prob、mu、sigma、returns、advantages 等张量,形状统一为 [num_transitions_per_env, num_envs, ...]。
mini_batch_generator 的工作方式是将时间与环境维度展平为 [batch_size, ...],随后通过 torch.randperm 生成全局随机索引,按 num_mini_batches 切分。这种全量打乱策略保证了每个 mini-batch 都包含来自不同环境与时间步的样本,有效降低序列相关性。对于循环网络,recurrent_mini_batch_generator 则会调用 split_and_pad_trajectories,依据 dones 将轨迹切分并填充,确保隐藏状态只在轨迹边界处重置。
Sources: rollout_storage.py
日志记录与模型持久化
Logger 在每次迭代末被调用,负责将训练指标写入 TensorBoard、W&B 或 Neptune,并在控制台输出格式化报表。记录内容涵盖:各项损失值、学习率、策略噪声标准差、FPS(每秒采样帧数)、采集与学习时间、平均 episode 奖励与长度,以及环境侧透传的 episode 自定义指标。若启用 RND,还会单独展示外在奖励、内在奖励与 RND 权重。
OnPolicyRunner.save() 将 model_state_dict、optimizer_state_dict、当前迭代次数及可选的 infos 字典序列化到磁盘;若存在 RND 模块,也会一并保存其状态与优化器。load() 则支持从 checkpoint 恢复训练,能够智能判断模型结构是否兼容,并选择性加载优化器状态与历史迭代计数,实现无缝断点续训。
Sources: logger.py, on_policy_runner.py
关键配置参数一览
下方表格汇总了 OnPolicyRunner 及其底层 PPO 算法在配置文件中常见的超参数及其工程含义。
| 配置项 | 类型 / 典型值 | 作用说明 |
|---|---|---|
num_steps_per_env |
int / 24 | 每个环境单次迭代采集的步数,直接决定 Rollout 缓冲区大小 |
max_iterations |
int / 1500 | 总策略更新次数 |
save_interval |
int / 50 | 模型保存间隔(迭代数) |
learning_rate |
float / 1e-3 | 初始学习率,adaptive 模式下会被动态调整 |
num_learning_epochs |
int / 5 | 每次迭代中,对同一批数据重复更新的 epoch 数 |
num_mini_batches |
int / 4 | 将 Rollout 数据划分为多少个小批量;batch_size = num_envs × num_steps / num_mini_batches |
clip_param |
float / 0.2 | PPO 裁剪阈值 ε |
gamma |
float / 0.99 | 折扣因子 |
lam |
float / 0.95 | GAE 参数 λ |
value_loss_coef |
float / 1.0 | 价值损失权重 |
entropy_coef |
float / 0.01 | 策略熵奖励权重,鼓励探索 |
max_grad_norm |
float / 1.0 | 梯度裁剪的最大 L2 范数 |
use_clipped_value_loss |
bool / true | 是否对 value loss 使用裁剪 |
schedule |
str / adaptive | 学习率调度策略:adaptive 基于 KL 散度;fixed 保持恒定 |
desired_kl |
float / 0.01 | 自适应学习率的目标 KL 散度 |
normalize_advantage_per_mini_batch |
bool / false | 是否在 mini-batch 级别归一化优势,而非全局归一化 |
Sources: example_config.yaml
多卡分布式支持
OnPolicyRunner 内建了基于 torch.distributed 的多 GPU 训练能力。在 _configure_multi_gpu 中,Runner 通过环境变量 WORLD_SIZE、RANK、LOCAL_RANK 自动检测分布式环境,初始化 NCCL 进程组,并验证当前进程绑定的 GPU 设备是否正确。训练开始前,broadcast_parameters 确保所有进程从 rank 0 同步模型参数;在 update 的反向传播后,reduce_parameters 通过 all_reduce 对所有 GPU 的梯度求平均,保证参数一致性。日志记录仅在 rank == 0 的主进程上执行,避免重复写入与控制台混乱。
Sources: on_policy_runner.py, ppo.py
与周边模块的协作关系
OnPolicyRunner 并非孤立存在,其训练管线深度依赖库内其他模块。策略网络的拓扑结构由 Actor-Critic 网络架构详解 提供;若启用感知增强,则会调用 注意力编码器感知机制 中的 ActorCriticAttnEnc。训练脚本 train.py 负责通过 Hydra 解析配置、实例化 Isaac Lab 环境并包装为 RslRlVecEnvWrapper,最终创建 Runner 并触发 learn()。若需从已有策略继续训练,可借助 resume 配置与 load() 方法实现断点续训;若需将策略导出至 MuJoCo 或真机,请参阅 MuJoCo Sim2Sim 部署与真机迁移。
Sources: train.py
阅读进阶建议
理解 OnPolicyRunner 的执行机制后,建议按以下路径深入:
- 若需调整网络结构或添加自定义观测编码,阅读 Actor-Critic 网络架构详解。
- 若需引入好奇心探索或对称性约束,阅读 RND 好奇心探索与对称性增强。
- 若需在多卡服务器上扩展训练规模,阅读 分布式多卡训练配置。
- 若需将训练好的策略迁移到 MuJoCo 或实体机器人,阅读 MuJoCo Sim2Sim 部署与真机迁移。