【强化学习PPO算法】

article/2025/9/21 9:19:14

强化学习PPO算法

  • 一、PPO算法
  • 二、伪代码
  • 三、相关的简单理论
    • 1.ratio
    • 2.裁断
    • 3.Advantage的计算
    • 4.loss的计算
  • 四、算法实现
  • 五、效果
  • 六、感悟

  最近再改一个代码,需要改成PPO方式的,由于之前没有接触过此类算法,因此进行了简单学习,论文没有看的很详细,重点看了实现部分,这里只做简单记录。
  这里附上论文链接,需要的可以详细看一下。
   Proximal Policy Optimization Algorithms.

一、PPO算法

  PPO算法本质上是一个On-Policy的算法,它可以对采样到的样本进行多次利用,在一定程度上解决样本利用率低的问题,收到较好的效果。论文里有两种实现方式,一种是结合KL的penalty的,另一种是clip裁断的方法。大部分都是采用的后者,本文记录的也主要是后者的实现。

二、伪代码

  在网上找了一下伪代码,大概两类,前者是Open AI的,比较精炼,后者是Deepmind的,写的比较详细,在这里同时附上.

在这里插入图片描述

在这里插入图片描述

三、相关的简单理论

1.ratio

在这里插入图片描述
  这里的比例ratio,是两种策略下动作的概率比,而在程序实现中,用的是对动作分布取对数,而后使用e指数相减的方法,具体实现如下所示:

action_logprobs = dist.log_prob(action)
ratios = torch.exp(logprobs - old_logprobs.detach())

2.裁断

在这里插入图片描述
  其中,裁断对应的部分如下图所示:
在这里插入图片描述
  上述公式代表的含义如下:
  clip公式含义.
在这里插入图片描述
  这里我是这样理解的:
  (1)如果A>0,说明现阶段的(st,at)相对较好,那么我们希望该二元组出现的概率越高越好,即ratio中的分子越大越好,但是分母分子不能差太多,因此需要加一个上限;
  (2)如果A<0,说明现阶段的(st,at)相对较差,那么我们希望该二元组出现的概率越低越好,即ratio中的分子越小越好,但是分母分子不能差太多,因此需要加一个下限.

3.Advantage的计算

  论文里计算At的方式如下,在一些情况下可以令lamda为1;还有一种更常用的计算方式是VAE,这里不进行描述.。
在这里插入图片描述
  对应的代码块如下:

 def update(self, memory):# Monte Carlo estimate of rewards:rewards = []discounted_reward = 0for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):if is_terminal:discounted_reward = 0discounted_reward = reward + (self.gamma * discounted_reward)rewards.insert(0, discounted_reward)

4.loss的计算

在这里插入图片描述
  这里的第一项,对应裁断项,需要计算ratio和Advantage,之后进行裁断;
  这里的第二项,对应的为对应的值的均方误差;
  这里的第三项,为交叉熵
  程序的实现如下所示:

surr1 = ratios * advantages
surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy

四、算法实现

  这里算法的实现参考了一位博主
  PPO代码.

#!/usr/bin/python3
# -*-coding:utf-8 -*-# @Time    : 2022/6/18 15:53
# @Author  : Wang xiangyu
# @File    : PPO.py
import torch
import torch.nn as nn
from torch.distributions import MultivariateNormal
import gym
import numpy as npdevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")class Memory:def __init__(self):self.actions = []self.states = []self.logprobs = []self.rewards = []self.is_terminals = []def clear_memory(self):# del语句作用在变量上,而不是数据对象上。删除的是变量,而不是数据。del self.actions[:]del self.states[:]del self.logprobs[:]del self.rewards[:]del self.is_terminals[:]class ActorCritic(nn.Module):def __init__(self, state_dim, action_dim, action_std):super(ActorCritic, self).__init__()# action mean range -1 to 1self.actor = nn.Sequential(nn.Linear(state_dim, 64),nn.Tanh(),nn.Linear(64, 32),nn.Tanh(),nn.Linear(32, action_dim),nn.Tanh())# criticself.critic = nn.Sequential(nn.Linear(state_dim, 64),nn.Tanh(),nn.Linear(64, 32),nn.Tanh(),nn.Linear(32, 1))# 方差self.action_var = torch.full((action_dim,), action_std * action_std).to(device)def forward(self):# 手动设置异常raise NotImplementedErrordef act(self, state, memory):action_mean = self.actor(state)cov_mat = torch.diag(self.action_var).to(device)dist = MultivariateNormal(action_mean, cov_mat)action = dist.sample()action_logprob = dist.log_prob(action)memory.states.append(state)memory.actions.append(action)memory.logprobs.append(action_logprob)return action.detach()def evaluate(self, state, action):action_mean = self.actor(state)action_var = self.action_var.expand_as(action_mean)# torch.diag_embed(input, offset=0, dim1=-2, dim2=-1) → Tensor# Creates a tensor whose diagonals of certain 2D planes (specified by dim1 and dim2) are filled by inputcov_mat = torch.diag_embed(action_var).to(device)# 生成一个多元高斯分布矩阵dist = MultivariateNormal(action_mean, cov_mat)# 我们的目的是要用这个随机的去逼近真正的选择动作action的高斯分布action_logprobs = dist.log_prob(action)# log_prob 是action在前面那个正太分布的概率的log ,我们相信action是对的 ,# 那么我们要求的正态分布曲线中点应该在action这里,所以最大化正太分布的概率的log, 改变mu,sigma得出一条中心点更加在a的正太分布。dist_entropy = dist.entropy()state_value = self.critic(state)return action_logprobs, torch.squeeze(state_value), dist_entropyclass PPO:def __init__(self, state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip):self.lr = lrself.betas = betasself.gamma = gammaself.eps_clip = eps_clipself.K_epochs = K_epochsself.policy = ActorCritic(state_dim, action_dim, action_std).to(device)self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr, betas=betas)self.policy_old = ActorCritic(state_dim, action_dim, action_std).to(device)self.policy_old.load_state_dict(self.policy.state_dict())self.MseLoss = nn.MSELoss()def select_action(self, state, memory):state = torch.FloatTensor(state.reshape(1, -1)).to(device)return self.policy_old.act(state, memory).cpu().data.numpy().flatten()def update(self, memory):# Monte Carlo estimate of rewards:rewards = []discounted_reward = 0for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):if is_terminal:discounted_reward = 0discounted_reward = reward + (self.gamma * discounted_reward)rewards.insert(0, discounted_reward)# Normalizing the rewards:rewards = torch.tensor(rewards, dtype=torch.float32).to(device)rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-5)# convert list to tensor# 使用stack可以保留两个信息:[1. 序列] 和 [2. 张量矩阵] 信息,属于【扩张再拼接】的函数;old_states = torch.squeeze(torch.stack(memory.states).to(device), 1).detach()old_actions = torch.squeeze(torch.stack(memory.actions).to(device), 1).detach()old_logprobs = torch.squeeze(torch.stack(memory.logprobs), 1).to(device).detach()#这里即可以对样本进行多次利用,提高利用率# Optimize policy for K epochs:for _ in range(self.K_epochs):# Evaluating old actions and values :logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)# Finding the ratio (pi_theta / pi_theta__old):ratios = torch.exp(logprobs - old_logprobs.detach())# Finding Surrogate Loss:advantages = rewards - state_values.detach()surr1 = ratios * advantagessurr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantagesloss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy# take gradient stepself.optimizer.zero_grad()loss.mean().backward()self.optimizer.step()# Copy new weights into old policy:self.policy_old.load_state_dict(self.policy.state_dict())def main():############## Hyperparameters ##############env_name = "Pendulum-v1"render = Falsesolved_reward = 300  # stop training if avg_reward > solved_rewardlog_interval = 20  # print avg reward in the intervalmax_episodes = 10000  # max training episodesmax_timesteps = 1500  # max timesteps in one episodeupdate_timestep = 4000  # update policy every n timestepsaction_std = 0.5  # constant std for action distribution (Multivariate Normal)K_epochs = 80  # update policy for K epochseps_clip = 0.2  # clip parameter for PPOgamma = 0.99  # discount factorlr = 0.0003  # parameters for Adam optimizerbetas = (0.9, 0.999)############################################## creating environmentenv = gym.make(env_name)state_dim = env.observation_space.shape[0]action_dim = env.action_space.shape[0]memory = Memory()ppo = PPO(state_dim, action_dim, action_std, lr, betas, gamma, K_epochs, eps_clip)print(lr, betas)# logging variablesrunning_reward = 0avg_length = 0time_step = 0# training loopfor i_episode in range(1, max_episodes + 1):state = env.reset()for t in range(max_timesteps):time_step += 1# Running policy_old:action = ppo.select_action(state, memory)state, reward, done, _ = env.step(action)# Saving reward and is_terminals:memory.rewards.append(reward)memory.is_terminals.append(done)# update if its timeif time_step % update_timestep == 0:ppo.update(memory)memory.clear_memory()time_step = 0running_reward += rewardif render:env.render()if done:breakavg_length += t+1# stop training if avg_reward > solved_rewardif running_reward > (log_interval * solved_reward):print("########## Solved! ##########")torch.save(ppo.policy.state_dict(), './PPO_continuous_solved_{}.pth'.format(env_name))break# save every 500 episodesif i_episode % 500 == 0:torch.save(ppo.policy.state_dict(), './PPO_continuous_{}.pth'.format(env_name))# loggingif i_episode % log_interval == 0:avg_length = int(avg_length / log_interval)running_reward = int((running_reward / log_interval))print('Episode {} \t Avg length: {} \t Avg reward: {}'.format(i_episode, avg_length, running_reward))running_reward = 0avg_length = 0if __name__ == '__main__':main()

五、效果

  可以看到经过一段时间的训练,奖励有了一定升高.

在这里插入图片描述
在这里插入图片描述

六、感悟

  感悟是对改的项目的总结,和本文没有什么关系。
  这次改的项目参考了PPO的代码,架子基本也是搭好的,所以改起来也没有想象的那么困难。但应该是我第一次改代码,之前只是看代码,从来没有尝试改过那么多,可以感觉到看代码和改代码这两个能力间差的真的很多,写代码就更困难了emm,可以说经过这一次,可以更好的看到和别人的差距,不过对自己也有很大提高。在以后的学习中,还是需要多看多写,逐步提高。


http://chatgpt.dhexx.cn/article/WMS8Avb7.shtml

相关文章

【深度强化学习】(6) PPO 模型解析,附Pytorch完整代码

大家好&#xff0c;今天和各位分享一下深度强化学习中的近端策略优化算法&#xff08;proximal policy optimization&#xff0c;PPO&#xff09;&#xff0c;并借助 OpenAI 的 gym 环境完成一个小案例&#xff0c;完整代码可以从我的 GitHub 中获得&#xff1a; https://gith…

autoit连接mysql数据库

原链接点我 一,准备工作 1, 下载mysql.au3(这个点击就下载了) 把mysql.au3放入到autoit的include目录下 2, 下载mysql驱动(根据自己系统选,下载完之后,双击运行会自动安装,一路next就行) 二,使用 #include "mysql.au3" #include <Array.au3> ;弹窗 Func aler…

AutoIt-v3的安装,和robotframework-autoitlibrary的导入

AutoIt 最新是v3版本&#xff0c;这是一个使用类似BASIC脚本语言的免费软件,它设计用于Windows GUI&#xff08;图形用户界面)中进行自动化操作。它利用模拟键盘按键&#xff0c;鼠标移动和窗口/控件的组合来实现自动化任务。而这是其它语言不可能做到或无可靠方法实现的。 Au…

selenium 上传下载调用windows窗口--AutoIT

AutoIT解决自动化上传下载文件调用Windows窗口 AutoIT下载安装使用AotuIt 操作windows上传窗口1. 打开AutoIt定位窗口组件2. 定位上传窗口属性 &#xff08;鼠标选中Finder Tool 拖拽至属性窗口&#xff09;3. 打开autoIt编辑器&#xff0c;编写代码4. 将脚本文件转成exe文件5.…

软件质量保证与测试 实验十一:AutoIt的使用

目录 实验概述实验内容1. 下载安装AutoIT。2. 测试win系统自带计算器程序&#xff0c; 246&#xff0c;是否正确&#xff1f; 写出Script。&#xff08;小提示&#xff1a;使用WinGetText获得输出&#xff09;3.测试win系统自带计算器程序&#xff0c; 写出3个以上的测试用例的…

selenium 用autoIT上传下载文件

一、下载安装AutoIT 下载并安装AutoIT&#xff0c;下载链接&#xff1a;https://www.autoitscript.com/site/autoit/AutoIT安装成功后&#xff0c;可以在开始菜单下看到AutoIT的所有工具&#xff0c;如下图所示&#xff1a; 其中分为几类&#xff0c;AutoIT Window Info用来识…

selenium autoit java_selenium+java利用AutoIT实现文件上传

转载自&#xff1a;https://www.cnblogs.com/yunman/p/7112882.html?utm_sourceitdadao&utm_mediumreferral 1、AutoIT介绍 AutoIT是一个类似脚本语言的软件&#xff0c;利用此软件我们可以方便的实现模拟键盘、鼠标、窗口等操作&#xff0c;实现自动化。 2、实现原理 利用…

autoIT 自动化上传/下载文件图文详解【python selenium】

情景&#xff1a; 在用selenium进行web页面自动化时&#xff0c;时不时会遇到上传附件的情况&#xff0c;常见的情况就是一个上传按钮&#xff0c;点击后弹出windows窗口&#xff0c;选择文件后上传&#xff0c;如下图1所示 图1 这种情况超出了selenium的能力范围&#xff0c;需…

AutoIt介绍

AutoIt的下载网址&#xff1a; https://www.autoitscript.com/site/autoit/downloads/ AutoIt在线文档&#xff1a;http://www.autoit3.cn/Doc/ AutoIt的优势&#xff1a; 简单易懂的类BASIC 表达式模拟键盘,鼠标动作事件操作窗口与进程直接与窗口的”标准控件”交互(设置/获…

AutoIt的应用

少数情况下需要操作系统级的弹窗&#xff0c;可以使用AutoIt。 AutoIt现在最新版是V3版本&#xff0c;这是一个类似BASIC脚本语言的免费软件&#xff0c;用于Windows GUI中进行自动化操作。利用模拟键盘按键&#xff0c;鼠标移动&#xff0c;窗口和控件的组合来实现自动化任务…

java 调用autoit_java和autoit连接

autoit可以实现本机文件的上传&#xff0c;修改&#xff0c;新建&#xff0c;也可以实现网页上文件下载到本地 连接步骤&#xff1a; (1)下载autoitx4java 包&#xff0c;地址在code.google.com/p/autoitx4java。解压后直接将jar包添加到工程里面。然后需要使用jacob包&#xf…

AutoIt在线使用手册地址

AutoIt 在线文档https://autoitx.com/Doc/

AutoIt3.0

autoIt主要用于窗口自动化&#xff0c;结合python&#xff0c;可解决web自动化&#xff0c;页面调出窗口的问题 autoIt脚本代码例子&#xff1a; 1.打开Windows 任务管理器 2.依次点击【应用程序、进程、服务、性能、联网、用户】按钮 3.再次点击应用程序按钮 4.点选第二个…

Python + Selenium + AutoIt 模拟键盘实现另存为、上传、下载操作详解

前言 在web页面中&#xff0c;可以使用selenium的定位方式来识别元素&#xff0c;从而来实现页面中的自动化&#xff0c;但对于页面中弹出的文件选择框&#xff0c;selenium就实现不了了&#xff0c;所以就需引用AutoIt工具来实现。 AutoIt介绍 AutoIt简单介绍下&#xff0c…

autoit 下载图片验证码

autoit 下载图片验证码 自动化测试中&#xff0c;我做了验证码识别的功能&#xff0c;那么接下来就是怎么获取验证码图片了&#xff0c;还好autoit 里面提供了一些方法。下面就介绍一下怎样利用autoit 下载验证码图片&#xff1a; 先说思路&#xff1a; 右键点击验证码 使用…

获取硬盘序列号的真正方法!!

最近要获取磁盘的序列号&#xff0c;在网上找了很久发现大部分都是通过diskpart来查询 这种查询方法只是查询的磁盘的id 真正查询磁盘序列号应该使用下面方法&#xff1a;wmic diskdrive get serialnumber 打开cmd后输入 serialNumber下面的就是硬盘序列号

如何查询硬盘序列号?百度基本都是错的,其实一条命令搞定!

百度上答案不知是不懂还是怎么着&#xff0c;都是通过diskpart -->detail disk查询&#xff0c;查出来的是磁盘ID&#xff0c;不是序列号&#xff01; 其实查询磁盘序列号方式很简单&#xff1a; 1.Win R打开 “运行” &#xff0c;在 运行 中输入“cmd”&#xff0c;然后…

手动查询硬盘序列号

win7旗舰版1. 开始-运行-输入&#xff1a;wbemtest 回车 2. 单击"连接", 输入&#xff1a;root\cimv2 回车; 或者ROOT\SecurityCenter 3. 单击"查询", 输入&#xff1a;select * from Win32_PhysicalMedia 应用&#xff0c;出来三个&#xff0c;我这第一…

怎么查询电脑的磁盘序列号和各种硬件信息(Windows系统)

之前实习时遇到过这种情况&#xff0c;听说国家保密局要统计高校领导办公电脑的磁盘信息&#xff0c;防止信息泄露&#xff0c;就让我和同事就统计磁盘的序列号&#xff0c;很简单&#xff0c;就是电脑太多&#xff0c;统计速度太慢啦&#xff0c;都是Windows系统&#xff0c;没…

查看硬盘序列号的方法 和查看设备序列号的方法

查看硬盘序列号 wmic diskdrive get serialnumber 显示的就是序列号了 查看设备序列号 wmic bios get serialnumber