【强化学习】PPO算法求解倒立摆问题 + Pytorch代码实战

article/2025/9/21 7:49:56

文章目录

  • 一、倒立摆问题介绍
  • 二、PPO算法简介
  • 三、详细资料
  • 四、Python代码实战
    • 4.1 运行前配置
    • 4.2 主要代码
    • 4.3 运行结果展示
    • 4.4 关于可视化的设置


一、倒立摆问题介绍

Agent 必须在两个动作之间做出决定 - 向左或向右移动推车 - 以使连接到它的杆保持直立。
在这里插入图片描述

二、PPO算法简介

近端策略优化 ( proximal policy optimization, PPO):

避免在使用重要性采样时由于在 θ \theta θ 下的 p θ ( a t ∣ s t ) p_\theta\left(a_t \mid s_t\right) pθ(atst) 与在 θ ′ \theta^{\prime} θ 下的 p θ ′ ( a t ∣ s t ) p_{\theta^{\prime}}\left(a_t \mid s_t\right) pθ(atst) 相差太多, 导致重要性采样结果偏差较大而采取的算法。具体来说就是在训练的过 程中增加一个限制, 这个限制对应 θ \theta θ θ ′ \theta^{\prime} θ 输出的动作的 KL 散度, 来衡量 θ \theta θ θ ′ \theta^{\prime} θ 的相似程度。

三、详细资料

关于更加详细的PPO算法介绍,请看我之前发的博客:【EasyRL学习笔记】第五章 Proximal Policy Optimization 近端策略优化算法

在学习PPO算法前你最好能了解以下知识点:

  • 全连接神经网络
  • 神经网络求解分类问题
  • 神经网络基本工作原理
  • KL散度

四、Python代码实战

4.1 运行前配置

准备好一个RL_Utils.py文件,文件内容可以从我的一篇里博客获取:【RL工具类】强化学习常用函数工具类(Python代码)

这一步很重要,后面需要引入该RL_Utils.py文件

在这里插入图片描述

4.2 主要代码

import argparse
import datetime
import time
import torch.optim as optim
from torch.distributions.categorical import Categorical
import gym
from torch import nn# 这里需要改成自己的RL_Utils.py文件的路径
from Python.ReinforcementLearning.EasyRL.RL_Utils import *class PPOMemory:def __init__(self, batch_size):self.states = []self.probs = []self.vals = []self.actions = []self.rewards = []self.dones = []self.batch_size = batch_sizedef sample(self):batch_step = np.arange(0, len(self.states), self.batch_size)indices = np.arange(len(self.states), dtype=np.int64)np.random.shuffle(indices)batches = [indices[i:i + self.batch_size] for i in batch_step]return np.array(self.states), np.array(self.actions), np.array(self.probs), \np.array(self.vals), np.array(self.rewards), np.array(self.dones), batchesdef push(self, state, action, probs, vals, reward, done):self.states.append(state)self.actions.append(action)self.probs.append(probs)self.vals.append(vals)self.rewards.append(reward)self.dones.append(done)def clear(self):self.states = []self.probs = []self.actions = []self.rewards = []self.dones = []self.vals = []class Actor(nn.Module):def __init__(self, n_states, n_actions,hidden_dim):super(Actor, self).__init__()self.actor = nn.Sequential(nn.Linear(n_states, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, n_actions),nn.Softmax(dim=-1))def forward(self, state):dist = self.actor(state)dist = Categorical(dist)return distclass Critic(nn.Module):def __init__(self, n_states, hidden_dim):super(Critic, self).__init__()self.critic = nn.Sequential(nn.Linear(n_states, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, 1))def forward(self, state):value = self.critic(state)return valueclass PPO:def __init__(self, n_states, n_actions, cfg):self.gamma = cfg['gamma']self.continuous = cfg['continuous']self.policy_clip = cfg['policy_clip']self.n_epochs = cfg['n_epochs']self.gae_lambda = cfg['gae_lambda']self.device = cfg['device']self.actor = Actor(n_states, n_actions, cfg['hidden_dim']).to(self.device)self.critic = Critic(n_states, cfg['hidden_dim']).to(self.device)self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=cfg['actor_lr'])self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=cfg['critic_lr'])self.memory = PPOMemory(cfg['batch_size'])self.loss = 0def choose_action(self, state):state = np.array([state])  # 先转成数组再转tensor更高效state = torch.tensor(state, dtype=torch.float).to(self.device)dist = self.actor(state)value = self.critic(state)action = dist.sample()probs = torch.squeeze(dist.log_prob(action)).item()if self.continuous:action = torch.tanh(action)else:action = torch.squeeze(action).item()value = torch.squeeze(value).item()return action, probs, valuedef update(self):for _ in range(self.n_epochs):state_arr, action_arr, old_prob_arr, vals_arr, reward_arr, dones_arr, batches = self.memory.sample()values = vals_arr[:]### compute advantage ###advantage = np.zeros(len(reward_arr), dtype=np.float32)for t in range(len(reward_arr) - 1):discount = 1a_t = 0for k in range(t, len(reward_arr) - 1):a_t += discount * (reward_arr[k] + self.gamma * values[k + 1] * \(1 - int(dones_arr[k])) - values[k])discount *= self.gamma * self.gae_lambdaadvantage[t] = a_tadvantage = torch.tensor(advantage).to(self.device)### SGD ###values = torch.tensor(values).to(self.device)for batch in batches:states = torch.tensor(state_arr[batch], dtype=torch.float).to(self.device)old_probs = torch.tensor(old_prob_arr[batch]).to(self.device)actions = torch.tensor(action_arr[batch]).to(self.device)dist = self.actor(states)critic_value = self.critic(states)critic_value = torch.squeeze(critic_value)new_probs = dist.log_prob(actions)prob_ratio = new_probs.exp() / old_probs.exp()weighted_probs = advantage[batch] * prob_ratioweighted_clipped_probs = torch.clamp(prob_ratio, 1 - self.policy_clip,1 + self.policy_clip) * advantage[batch]actor_loss = -torch.min(weighted_probs, weighted_clipped_probs).mean()returns = advantage[batch] + values[batch]critic_loss = (returns - critic_value) ** 2critic_loss = critic_loss.mean()total_loss = actor_loss + 0.5 * critic_lossself.loss = total_lossself.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()total_loss.backward()self.actor_optimizer.step()self.critic_optimizer.step()self.memory.clear()def save_model(self, path):Path(path).mkdir(parents=True, exist_ok=True)actor_checkpoint = os.path.join(path, 'ppo_actor.pt')critic_checkpoint = os.path.join(path, 'ppo_critic.pt')torch.save(self.actor.state_dict(), actor_checkpoint)torch.save(self.critic.state_dict(), critic_checkpoint)def load_model(self, path):actor_checkpoint = os.path.join(path, 'ppo_actor.pt')critic_checkpoint = os.path.join(path, 'ppo_critic.pt')self.actor.load_state_dict(torch.load(actor_checkpoint))self.critic.load_state_dict(torch.load(critic_checkpoint))# 训练函数
def train(arg_dict, env, agent):# 开始计时startTime = time.time()print(f"环境名: {arg_dict['env_name']}, 算法名: {arg_dict['algo_name']}, Device: {arg_dict['device']}")print("开始训练智能体......")rewards = []  # 记录所有回合的奖励ma_rewards = []  # 记录所有回合的滑动平均奖励steps = 0for i_ep in range(arg_dict['train_eps']):state = env.reset()done = Falseep_reward = 0while not done:# 画图if arg_dict['train_render']:env.render()action, prob, val = agent.choose_action(state)state_, reward, done, _ = env.step(action)steps += 1ep_reward += rewardagent.memory.push(state, action, prob, val, reward, done)if steps % arg_dict['update_fre'] == 0:agent.update()state = state_rewards.append(ep_reward)if ma_rewards:ma_rewards.append(0.9 * ma_rewards[-1] + 0.1 * ep_reward)else:ma_rewards.append(ep_reward)if (i_ep + 1) % 10 == 0:print(f"回合:{i_ep + 1}/{arg_dict['train_eps']},奖励:{ep_reward:.2f}")print('训练结束 , 用时: ' + str(time.time() - startTime) + " s")# 关闭环境env.close()return {'episodes': range(len(rewards)), 'rewards': rewards}# 测试函数
def test(arg_dict, env, agent):startTime = time.time()print("开始测试智能体......")print(f"环境名: {arg_dict['env_name']}, 算法名: {arg_dict['algo_name']}, Device: {arg_dict['device']}")rewards = []  # 记录所有回合的奖励ma_rewards = []  # 记录所有回合的滑动平均奖励for i_ep in range(arg_dict['test_eps']):state = env.reset()done = Falseep_reward = 0while not done:# 画图if arg_dict['test_render']:env.render()action, prob, val = agent.choose_action(state)state_, reward, done, _ = env.step(action)ep_reward += rewardstate = state_rewards.append(ep_reward)if ma_rewards:ma_rewards.append(0.9 * ma_rewards[-1] + 0.1 * ep_reward)else:ma_rewards.append(ep_reward)print('回合:{}/{}, 奖励:{}'.format(i_ep + 1, arg_dict['test_eps'], ep_reward))print("测试结束 , 用时: " + str(time.time() - startTime) + " s")env.close()return {'episodes': range(len(rewards)), 'rewards': rewards}# 创建环境和智能体
def create_env_agent(arg_dict):# 创建环境env = gym.make(arg_dict['env_name'])# 设置随机种子all_seed(env, seed=arg_dict["seed"])# 获取状态数try:n_states = env.observation_space.nexcept AttributeError:n_states = env.observation_space.shape[0]# 获取动作数n_actions = env.action_space.nprint(f"状态数: {n_states}, 动作数: {n_actions}")# 将状态数和动作数加入算法参数字典arg_dict.update({"n_states": n_states, "n_actions": n_actions})# 实例化智能体对象agent = PPO(n_states, n_actions, arg_dict)# 返回环境,智能体return env, agentif __name__ == '__main__':# 防止报错 OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized.os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"# 获取当前路径curr_path = os.path.dirname(os.path.abspath(__file__))# 获取当前时间curr_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")# 相关参数设置parser = argparse.ArgumentParser(description="hyper parameters")parser.add_argument('--algo_name', default='PPO', type=str, help="name of algorithm")parser.add_argument('--env_name', default='CartPole-v0', type=str, help="name of environment")parser.add_argument('--continuous', default=False, type=bool,help="if PPO is continuous")  # PPO既可适用于连续动作空间,也可以适用于离散动作空间parser.add_argument('--train_eps', default=200, type=int, help="episodes of training")parser.add_argument('--test_eps', default=20, type=int, help="episodes of testing")parser.add_argument('--gamma', default=0.99, type=float, help="discounted factor")parser.add_argument('--batch_size', default=5, type=int)  # mini-batch SGD中的批量大小parser.add_argument('--n_epochs', default=4, type=int)parser.add_argument('--actor_lr', default=0.0003, type=float, help="learning rate of actor net")parser.add_argument('--critic_lr', default=0.0003, type=float, help="learning rate of critic net")parser.add_argument('--gae_lambda', default=0.95, type=float)parser.add_argument('--policy_clip', default=0.2, type=float)  # PPO-clip中的clip参数,一般是0.1~0.2左右parser.add_argument('--update_fre', default=20, type=int)parser.add_argument('--hidden_dim', default=256, type=int)parser.add_argument('--device', default='cpu', type=str, help="cpu or cuda")parser.add_argument('--seed', default=520, type=int, help="seed")parser.add_argument('--show_fig', default=False, type=bool, help="if show figure or not")parser.add_argument('--save_fig', default=True, type=bool, help="if save figure or not")parser.add_argument('--train_render', default=False, type=bool,help="Whether to render the environment during training")parser.add_argument('--test_render', default=True, type=bool,help="Whether to render the environment during testing")args = parser.parse_args()default_args = {'result_path': f"{curr_path}/outputs/{args.env_name}/{curr_time}/results/",'model_path': f"{curr_path}/outputs/{args.env_name}/{curr_time}/models/",}# 将参数转化为字典 type(dict)arg_dict = {**vars(args), **default_args}print("算法参数字典:", arg_dict)# 创建环境和智能体env, agent = create_env_agent(arg_dict)# 传入算法参数、环境、智能体,然后开始训练res_dic = train(arg_dict, env, agent)print("算法返回结果字典:", res_dic)# 保存相关信息agent.save_model(path=arg_dict['model_path'])save_args(arg_dict, path=arg_dict['result_path'])save_results(res_dic, tag='train', path=arg_dict['result_path'])plot_rewards(res_dic['rewards'], arg_dict, path=arg_dict['result_path'], tag="train")# =================================================================================================# 创建新环境和智能体用来测试print("=" * 300)env, agent = create_env_agent(arg_dict)# 加载已保存的智能体agent.load_model(path=arg_dict['model_path'])res_dic = test(arg_dict, env, agent)save_results(res_dic, tag='test', path=arg_dict['result_path'])plot_rewards(res_dic['rewards'], arg_dict, path=arg_dict['result_path'], tag="test")

4.3 运行结果展示

由于有些输出太长,下面仅展示部分输出

状态数: 4, 动作数: 2
环境名: CartPole-v0, 算法名: PPO, Device: cpu
开始训练智能体......
回合:10/200,奖励:14.00
回合:20/200,奖励:36.00
回合:30/200,奖励:21.00
回合:40/200,奖励:23.00
回合:50/200,奖励:25.00
回合:60/200,奖励:155.00
回合:70/200,奖励:200.00
回合:80/200,奖励:101.00
回合:90/200,奖励:153.00
回合:100/200,奖励:145.00
回合:110/200,奖励:166.00
回合:120/200,奖励:200.00
回合:130/200,奖励:200.00
回合:140/200,奖励:200.00
回合:150/200,奖励:200.00
回合:160/200,奖励:144.00
回合:170/200,奖励:200.00
回合:180/200,奖励:200.00
回合:190/200,奖励:200.00
回合:200/200,奖励:200.00
训练结束 , 用时: 130.60313510894775 s
============================================================================================================================================================================================================================================================================================================
状态数: 4, 动作数: 2
开始测试智能体......
环境名: CartPole-v0, 算法名: PPO, Device: cpu
回合:1/20, 奖励:200.0
回合:2/20, 奖励:200.0
回合:3/20, 奖励:200.0
回合:4/20, 奖励:200.0
回合:5/20, 奖励:200.0
回合:6/20, 奖励:200.0
回合:7/20, 奖励:200.0
回合:8/20, 奖励:200.0
回合:9/20, 奖励:200.0
回合:10/20, 奖励:200.0
回合:11/20, 奖励:200.0
回合:12/20, 奖励:200.0
回合:13/20, 奖励:200.0
回合:14/20, 奖励:200.0
回合:15/20, 奖励:200.0
回合:16/20, 奖励:200.0
回合:17/20, 奖励:200.0
回合:18/20, 奖励:181.0
回合:19/20, 奖励:200.0
回合:20/20, 奖励:125.0
测试结束 , 用时: 31.763733386993408 s

在这里插入图片描述

PPO算法测试

在这里插入图片描述

策略梯度算法测试:【强化学习】Policy Gradient 策略梯度算法求解CartPole倒立摆问题 + Python代码实战

在这里插入图片描述

是不是明显感觉到经过PPO算法训练出来的智能体在测试中表现得更加稳呢!

4.4 关于可视化的设置

如果你觉得可视化比较耗时,你可以进行设置,取消可视化。
或者你想看看训练过程的可视化,也可以进行相关设置

在这里插入图片描述


http://chatgpt.dhexx.cn/article/KiunMOXH.shtml

相关文章

强化学习之PPO

阅读本文前先了解TRPO算法有助于理解,我对此也写过博客:https://blog.csdn.net/tianjuewudi/article/details/120191097 参考李宏毅老师的视频:https://www.bilibili.com/video/BV1Wv411h7kN?p80 PPO,全名Proximal Policy Opti…

【强化学习】PPO:从On-policy到Off-policy(PPO/TRPO/PPO-Penalty/PPO-Clip)

目录 一、为什么要从On- Policy到Off-Policy?二、如何从On- Policy到Off-Policy?三、如何使 p θ ( a t ∣ s t ) p_\theta(a_t|s_t) pθ​(at​∣st​)和 p θ ′ ( a t ∣ s t ) p_{\theta}(a_t|s_t) pθ′​(at​∣st​)不相差太多?3.1 PP…

如何理解PPO算法的核心操作clip

回顾 传统的策略梯度算法以下式作为策略网络的损失: g ^ E ^ t [ ∇ θ log ⁡ π θ ( a t ∣ s t ) A ^ t ] \hat{g}\hat{\mathbb{E}}_{t}\left[\nabla_{\theta} \log \pi_{\theta}\left(a_{t} \mid s_{t}\right) \hat{A}_{t}\right] g^​E^t​[∇θ​logπθ​…

强化学习PPO从理论到代码详解(2)---PPO1和PPO2

在线或离线学习 上一节我们了解了什么是策略梯度,本节开始讲PPO理论之前,我们先提出一个概念,什么在线学习,什么离线学习。 On-policy: Then agent learned and the agent interacting with Environment is the same Off-policy…

强化学习PPO代码讲解

阅读本文前对PPO的基本原理要有概念性的了解,本文基于我的上一篇文章:强化学习之PPO 当然,查看代码对于算法的理解直观重要,这使得你的知识不止停留在概念的层面,而是深入到应用层面。 代码采用了简单易懂的强化学习…

PPO算法(附pytorch代码)

这里写目录标题 一、PPO算法(1)简介(2)On-policy?(3)GAE (Generalized Advantage Estimation) 三、代码代码解析: 一、PPO算法 (1)简介 PPO算法…

论文笔记之PPO

15年OpenAI发表了TRPO算法,一直策略单调提升的算法;17年DeepMind基于TRPO发表了一篇Distributed-PPO,紧接着OpenAI发表了这篇PPO。可以说TRPO是PPO的前身,PPO在TRPO的基础上进行改进,使得算法可读性更高,实…

PPO实战学习总结

PPO used in go-bigger 前段时间一直在学习ppo算法,写了 一点总结,记录一下自己对ppo算法的一些理解与RL实战时候容易遇到的一些问题。代码地址如下,需要的可以自取: https://github.com/FLBa9762/PPO_used_in_Gobigger.git一般…

PPO算法

在线学习和离线学习 在线学习:和环境互动的Agent以及和要学习的Agent是同一个, 同一个Agent,一边和环境做互动,一边在学习。离线学习: 和环境互动及的Agent以和要学习的Agent不是同一个,学习的Agent通过看别人完来学习。 利用新的…

PPO2代码 pytorch框架

PPO2代码玩gym库的Pendulum环境 2022-8-02更新 我发现这篇文章浏览量惨淡啊。 咋滴,是不相信的我代码能用是吗? 所以,我给出reward的收敛曲线图: 开玩笑,出来混,我能卖你生瓜码子吗? ———…

PPO实战

哈哈初学,复现龙龙老师的实例! state:是平衡小车上的杆子,观测状态由 4 个连续的参数组成:推车位置 [-2.4,2.4],车速 [-∞,∞],杆子角度 [~-41.8&#xff0c…

PyTorch实现PPO代码

原理:Proximal Policy Optimization近端策略优化(PPO) 视频:Proximal Policy Optimization (PPO) is Easy With PyTorch | Full PPO Tutorial 代码来自github: Youtube-Code-Repository EasyRL 网站:Neural…

优化PPO

优化PPO 介绍core implementation details1.Vectorized architecture 量化结构Orthogonal Initialization of Weights and Constant Initialization of biases 算法权重的初始化以及恒定偏差的初始化The Adam Optimizer’s Epsilon Parameter Adam优化器的ε参数Adam Learning …

PPO Algorithm

‘‘目录 PPO ALGORITHM 进行看别人文章: 如何直观理解PPO算法?[理论篇] - 知乎 (zhihu.com) 【强化学习8】PPO - 知乎 (zhihu.com) PPO(OpenAI) Proximal Policy Optimization(PPO)算法原理及实现! - 简书 (jianshu.com) 1-Critic的作用与效果.m…

PPO算法实战

原理简介 PPO是一种on-policy算法,具有较好的性能,其前身是TRPO算法,也是policy gradient算法的一种,它是现在 OpenAI 默认的强化学习算法,具体原理可参考PPO算法讲解。PPO算法主要有两个变种,一个是结合K…

Proximal Policy Optimization(近端策略优化)(PPO)原理详解

本节开始笔者针对自己的研究领域进行RL方面的介绍和笔记总结,欢迎同行学者一起学习和讨论。本文笔者来介绍RL中比较出名的算法PPO算法,读者需要预先了解Reinforcement-Learning中几个基础定义才可以阅读,否则不容易理解其中的内容。不过笔者尽…

【强化学习PPO算法】

强化学习PPO算法 一、PPO算法二、伪代码三、相关的简单理论1.ratio2.裁断3.Advantage的计算4.loss的计算 四、算法实现五、效果六、感悟 最近再改一个代码,需要改成PPO方式的,由于之前没有接触过此类算法,因此进行了简单学习,论文…

【深度强化学习】(6) PPO 模型解析,附Pytorch完整代码

大家好,今天和各位分享一下深度强化学习中的近端策略优化算法(proximal policy optimization,PPO),并借助 OpenAI 的 gym 环境完成一个小案例,完整代码可以从我的 GitHub 中获得: https://gith…

autoit连接mysql数据库

原链接点我 一,准备工作 1, 下载mysql.au3(这个点击就下载了) 把mysql.au3放入到autoit的include目录下 2, 下载mysql驱动(根据自己系统选,下载完之后,双击运行会自动安装,一路next就行) 二,使用 #include "mysql.au3" #include <Array.au3> ;弹窗 Func aler…

AutoIt-v3的安装,和robotframework-autoitlibrary的导入

AutoIt 最新是v3版本&#xff0c;这是一个使用类似BASIC脚本语言的免费软件,它设计用于Windows GUI&#xff08;图形用户界面)中进行自动化操作。它利用模拟键盘按键&#xff0c;鼠标移动和窗口/控件的组合来实现自动化任务。而这是其它语言不可能做到或无可靠方法实现的。 Au…