深度Q学习
DQN已经说过很多很多次了,但是为了本期博客我还是决定再做一遍。
根据我们上一篇博客的知识,我们知道DQN使用的是Q学习的离轨策略,话不多说,我们直接假设我们的策略是一个全连接神经网络:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class DQN(nn.Module): def __init__(self, status_dim, hidden_dim, action_dim, gamma=0.9, *args, **kwargs): super().__init__(*args, **kwargs) self.status_dim = status_dim self.action_dim = action_dim self.gamma = gamma self.fn1 = nn.Linear(status_dim, hidden_dim * 64) self.fn2 = nn.Linear(hidden_dim * 64, hidden_dim * 64) self.fn3 = nn.Linear(hidden_dim * 64, action_dim)
def forward(self, x): x = self.fn1(x) x = F.relu(x) x = self.fn2(x) x = F.relu(x) x = self.fn3(x) return x
|
有了策略网络我们就需要两个策略对应离轨策略的两个网络参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class OffDQNAgent(nn.Module): def __init__(self, status_dim, action_dim, gamma=0.99, epsilon=0.9, learn_rate=0.001, device='cuda', *args, **kwargs): super().__init__(*args, **kwargs) self.status_dim = status_dim self.action_dim = action_dim self.gamma = gamma self.epsilon = epsilon self.learn_rate = learn_rate self.device = device
self.steps_done = 0 self.curious_limit = 0
self.policy_net = DQN(status_dim, 16 * status_dim, action_dim, gamma).to(device=self.device) self.target_net = DQN(status_dim, 16 * status_dim, action_dim, gamma).to(device=self.device)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learn_rate, amsgrad=True)
self.memory = deque(maxlen=2000)
|
我们再这个类的基础上进行说明,其中的gamma就是我们lambda回报计算的参数lambda,learn_rate就是学习率,也是我们理论公式中的alpha,steps_done意味循环次数,因为我们的目标网络需要从策略网络中定时更新参数,所以我们必须记录一下,避免太久不保存,而policy_net就是策略网络,用于在环境中摸爬滚打。target_net是我们的目标网络,用于我们定时更新参数,
这里我们需要进行epsilon贪心的计算,我们直接定义一个方法choice_action,写在类里:
1 2 3 4 5 6 7 8 9 10 11
| def choice_action(self, status, random_try=True): status_tensor = torch.FloatTensor(status).to(device=self.device) self.curious_limit = 0.05 + (self.epsilon - 0.05) * \ math.exp(-1. * self.steps_done / 2000) if random_try and random.random() < self.curious_limit: return random.randint(0, self.action_dim - 1) else: with torch.no_grad(): return torch.argmax(self.policy_net(status_tensor)).item()
|
这里我们需要对epsilon进行sigmoid下降,避免胆子太大造成学好的内容变成一堆垃圾。这里就按照随机概率进行选择是随机动作还是策略网络输出动作。
为了能看清目标网络的效果,我们再实现一个从目标网络获取动作的方法:
1 2 3 4
| def eval_choise(self, status): status_tensor = torch.FloatTensor(status).to(device=self.device) with torch.no_grad(): return torch.argmax(self.target_net(status_tensor)).item()
|
这个方法就不多赘述了,就是非常简单的前向传播。
我们还需要一个向经验池保存数据的办法,我们直接写在类内:
1 2 3
| def append_memory(self, s, a, r, next_s, done): self.memory.append((s, a, r, next_s, done))
|
然后就是我们关键的update步骤:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| def update(self, batch_size): if len(self.memory) < batch_size: return
random_sample = random.sample(self.memory, batch_size) states, actions, rewards, next_states, dones = zip(*random_sample)
states = torch.FloatTensor(states).to(device=self.device) actions = torch.LongTensor(actions).to(device=self.device) rewards = torch.FloatTensor(rewards).to(device=self.device) next_states = torch.FloatTensor(next_states).to(device=self.device) dones = torch.FloatTensor(dones).to(device=self.device)
with torch.no_grad(): target_values = rewards + self.gamma * torch.max(self.target_net(next_states), dim=1)[0] * (1 - dones)
q_values = self.policy_net(states).gather(1, actions.unsqueeze(1)).squeeze()
criterion = nn.SmoothL1Loss() loss = criterion(q_values, target_values)
self.optimizer.zero_grad() loss.backward() self.optimizer.step()
self.steps_done += 1 if self.steps_done % 10 == 0: self.target_net.load_state_dict(self.policy_net.state_dict())
|
正如注释所说,我也尝试了重要性采样进行梯度计算,但是结果不尽人意,给我们的倒立摆学成了大风车,尽管并没有DQN使用重要性采样的先例,但是我们依然进行了尝试。
这里面的梯度计算就是利用公式所说的:
至于SmoothL1Loss也就是平滑L1损失函数用于计算两者的差(不知道可以看看L1范数的定义,就是矢量的点差)
最后我们写一下main方法,并通过gym进行观察状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| import gym import torch import time import DQNModel
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def main(): env = gym.make('CartPole-v1') state_size = env.observation_space.shape[0] action_size = env.action_space.n num_episodes = 40
agent = DQNModel.OffDQNAgent(state_size, action_size, device=device)
for episode in range(num_episodes): state, _ = env.reset() done = False total_reward = 0
while not done: action = agent.choice_action(state, random_try=True) next_state, reward, done, _, _ = env.step(action) agent.append_memory(state, action, reward, next_state, done) agent.update(batch_size=64) state = next_state total_reward += reward
print(f"Episode: {episode}, Total Reward: {total_reward}")
env.close() env2 = gym.make('CartPole-v1', render_mode="human") for _ in range(100): state, _ = env2.reset() while True: env2.render() action = agent.eval_choise(state) next_state, reward, done, _, _ = env2.step(action) state = next_state time.sleep(0.1)
env2.close()
if __name__ == "__main__": main()
|
正如所见,只需要简单的组合以下就可以,我们仅仅只需要执行40轮,每轮仅需64个数据就完成了收敛,最终实现了一个非常好的效果:
我用PR简单的处理了一下,对付看哈。没有上传Gif动图原因是太大了,没法查看。
演说家-评论家网络
所谓的演说家评论家就是AC方法,通过上一篇文章我们知道AC方法是一种策略梯度算法,提及到策略梯度我们可能会有A3C异步演说家方法,TRPO信任区域策略算法以及一些诸如PPO等方法,我们都会逐步实现,既然涉及到了演说家-评论家,我们就必须要有一个Actor网络和Cirttic网络:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class Actor(nn.Module): def __init__(self, status_dim, action_dim, *args, **kwargs): super().__init__(*args, **kwargs) self.forwardConnectNet = nn.Sequential( nn.Linear(status_dim, 128), nn.ReLU(), nn.Linear(128, action_dim), nn.Softmax(dim=-1) ) pass
def forward(self, x): return self.forwardConnectNet(x)
class Critic(nn.Module): def __init__(self, status_dim, *args, **kwargs): super().__init__(*args, **kwargs) self.forwardConnectNet = nn.Sequential( nn.Linear(status_dim, 128), nn.ReLU(), nn.Linear(128, 1) )
def forward(self, x): return self.forwardConnectNet(x)
|
策略梯度不是离轨策略,所以为了确保两层网络在数据参数不统一的情况下可以正常反馈并训练网络,我们必须要进行优势函数的计算,也就是理论价值与实际价值的差向量。我们的AC算法是策略梯度,我们就必须求解一下梯度,也就是:
对策略的输出也就是动作概率的向量,我们简单的把Actor输出取一个对数即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| class Actor_Critic(nn.Module): def __init__(self, status_dim, action_dim, lr=0.001, device='cuda', *args, **kwargs): super().__init__(*args, **kwargs) self.status_dim = status_dim self.action_dim = action_dim self.device = device self.lr = lr self.actor = Actor(status_dim, action_dim).to(self.device) self.critic = Critic(status_dim).to(self.device)
self.actor_optim = optim.Adam(params=self.actor.parameters(), lr=lr) self.critic_optim = optim.Adam(params=self.critic.parameters(), lr=lr)
def choice_action(self, status): status = torch.FloatTensor(status).unsqueeze(0).to(self.device) probs = self.actor(status) action = np.random.choice(len(probs.detach().cpu().numpy()[0]), p=probs.detach().cpu().numpy()[0]) return action
def update(self, status, action, reward, next_status, done): status = torch.FloatTensor(status).unsqueeze(0).to(self.device) next_state = torch.FloatTensor(next_status).unsqueeze(0).to(self.device)
target_value = reward + (1 - done) * self.critic(next_state) advantage = target_value - self.critic(status)
action_prob = self.actor(status)[0, action] actor_loss = -torch.log(action_prob) * advantage.detach()
self.actor_optim.zero_grad() actor_loss.backward() self.actor_optim.step()
critic_loss = (target_value - self.critic(status)).pow(2) self.critic_optim.zero_grad() critic_loss.backward() self.critic_optim.step()
|
这里的代码比较多,其中我们的AC方法并不需要进行随机试探,我们直接把Actor的输出当作动作向量,按照向量的数值大小当作概率,根据概率选择动作即可。至于如何确认概率,读者可以观察一下Actor网络的最后一层,也就是SoftMAX层。
我们的更新方法处很简单,通过计算目标价值(最终奖励模式即只有游戏结束才会基于奖励),我们给出训练方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def train_PG(net, env, num_episodes=400): for episode in range(num_episodes): status, _ = env.reset() total_reward = 0 done = False
while not done: action = net.choice_action(status) next_state, reward, done, _, _ = env.step(action) net.update(status, action, reward, next_state, done) status = next_state total_reward += reward
print(f'Episode {episode + 1}, Total Reward: {total_reward}')
|
我们最后给出在env实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| def exampleShow(net, env): status, _ = env.reset() while True: env.render() action = net.choice_action(status) next_state, reward, done, _, _ = env.step(action) net.update(status, action, reward, next_state, done) status = next_state time.sleep(0.1)
def main(): env = gym.make("CartPole-v1") state_dim = env.observation_space.shape[0] action_dim = env.action_space.n net = model.Actor_Critic(state_dim, action_dim, device=device) train_PG(net, env) env.close() env2 = gym.make("CartPole-v1", render_mode="human") exampleShow(net, env2)
if __name__ == "__main__": main()
|
这个方法需要的轮次比较多才可以非常稳定,当然这是一个概率性问题
近端优化策略
近端优化是AC方法的进一步优化,我们不需要计算优势函数而是选择计算重要性采样:
所以我们只需要计算:
注意我们这里采用了ln函数,原因是我们的Actor输出的概率经过了softmax层,近似于取对数,接下来就给出AC对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class Actor(nn.Module): def __init__(self, status_dim, action_dim, *args, **kwargs): super().__init__(*args, **kwargs) self.forwardConnectNet = nn.Sequential( nn.Linear(status_dim, 256), nn.ReLU(), nn.Linear(256, 256), nn.ReLU(), nn.Linear(256, action_dim), nn.Softmax(dim=-1) ) pass
def forward(self, x): return self.forwardConnectNet(x)
class Critic(nn.Module): def __init__(self, status_dim, *args, **kwargs): super().__init__(*args, **kwargs) self.forwardConnectNet = nn.Sequential( nn.Linear(status_dim, 256), nn.ReLU(), nn.Linear(256, 256), nn.ReLU(), nn.Linear(256, 1) )
def forward(self, x): return self.forwardConnectNet(x)
|
我们给出PPO1方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| class PPO1(nn.Module): def __init__(self, status_dim, action_dim, lr=0.001, kl_target=3, device=torch.device("cuda"), *args, **kwargs): super().__init__(*args, **kwargs) self.device = device self.lr = lr self.kl_target = kl_target self.actor = Actor(status_dim, action_dim).to(self.device) self.critic = Critic(status_dim).to(self.device)
self.actor_optim = optim.AdamW(params=self.actor.parameters(), lr=lr) self.critic_optim = optim.AdamW(params=self.critic.parameters(), lr=lr)
def choice_action(self, status): status = torch.FloatTensor(status).unsqueeze(0).to(self.device) probs = self.actor(status) action = np.random.choice(len(probs.detach().cpu().numpy()[0]), p=probs.detach().cpu().numpy()[0]) return action
def update(self, states, actions, rewards, next_states, dones, old_log_probs): states = torch.FloatTensor(states).to(self.device) actions = torch.LongTensor(actions).to(self.device) rewards = torch.FloatTensor(rewards).to(self.device) next_states = torch.FloatTensor(next_states).to(self.device) dones = torch.FloatTensor(dones).to(self.device)
with torch.no_grad(): target_values = rewards + (1 - dones) * self.critic(next_states)
advantages = target_values - self.critic(states)
new_probs = self.actor(states) new_log_probs = new_probs.gather(1, actions.unsqueeze(1)).log() old_log_probs_tensor = torch.tensor(old_log_probs, device=self.device)
ratios = (new_log_probs - old_log_probs_tensor).exp()
kl_divergence = (old_log_probs_tensor - new_log_probs).mean().item()
actor_loss = -((new_log_probs - old_log_probs_tensor) * advantages * ratios).mean()
if kl_divergence > self.kl_target: self.actor_optim.zero_grad() actor_loss.backward() self.actor_optim.step()
critic_loss = (target_values - self.critic(states)).pow(2).mean() self.critic_optim.zero_grad() critic_loss.backward() self.critic_optim.step()
|
代码很简单,我们再次放入我们之前的函数内,就可以运行得到最终的稳定倒立摆了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| def train_ppo(env_name, num_episodes=500, max_timesteps=1024): env = gym.make(env_name) state_dim = env.observation_space.shape[0] action_dim = env.action_space.n ppo = PPO1(state_dim, action_dim).to(device=device)
for episode in range(num_episodes): states, actions, rewards, next_states, dones, old_log_probs = [], [], [], [], [], [] state, _ = env.reset() total_reward = 0
for t in range(max_timesteps): action = ppo.choice_action(state) next_state, reward, done, _, _ = env.step(action)
states.append(state) actions.append(action) rewards.append(reward) next_states.append(next_state) dones.append(float(done)) old_log_probs.append( ppo.actor(torch.FloatTensor(state).unsqueeze(0).to(device=device)) .gather(1, torch.LongTensor([[action]]).to(device=device)) .log().item() )
state = next_state total_reward += reward
if done: break
ppo.update(states, actions, rewards, next_states, dones, old_log_probs) print(f'Episode {episode + 1}, Total Reward: {total_reward}') env.close()
env22 = gym.make(env_name, render_mode="human") exampleShow(ppo, env22)
|
可以看到都可以非常稳定的站立。
总结
很遗憾无模型强化学习内容极多,我们下期博客继续更新