强化学习网络与机器人控制——无模型强化学习

深度Q学习

DQN已经说过很多很多次了,但是为了本期博客我还是决定再做一遍。

根据我们上一篇博客的知识,我们知道DQN使用的是Q学习的离轨策略,话不多说,我们直接假设我们的策略是一个全连接神经网络:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 这里我用简简单单的全连接作为我们的策略网络
class DQN(nn.Module):
# 状态空间到动作空间
def __init__(self, status_dim, hidden_dim, action_dim, gamma=0.9, *args, **kwargs):
super().__init__(*args, **kwargs)
self.status_dim = status_dim
self.action_dim = action_dim
self.gamma = gamma
self.fn1 = nn.Linear(status_dim, hidden_dim * 64)
self.fn2 = nn.Linear(hidden_dim * 64, hidden_dim * 64)
self.fn3 = nn.Linear(hidden_dim * 64, action_dim)

def forward(self, x):
x = self.fn1(x)
x = F.relu(x)
x = self.fn2(x)
x = F.relu(x)
x = self.fn3(x)
return x

有了策略网络我们就需要两个策略对应离轨策略的两个网络参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class OffDQNAgent(nn.Module):
def __init__(self, status_dim, action_dim, gamma=0.99, epsilon=0.9, learn_rate=0.001, device='cuda', *args,
**kwargs):
super().__init__(*args, **kwargs)
self.status_dim = status_dim
self.action_dim = action_dim
self.gamma = gamma
self.epsilon = epsilon
# 这里的学习率是TD方法的alpha,换个名字罢了
self.learn_rate = learn_rate
self.device = device

self.steps_done = 0
self.curious_limit = 0

# 这里采用离轨策略
self.policy_net = DQN(status_dim, 16 * status_dim, action_dim, gamma).to(device=self.device)
self.target_net = DQN(status_dim, 16 * status_dim, action_dim, gamma).to(device=self.device)

self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learn_rate, amsgrad=True)

# 经验池选择队列
self.memory = deque(maxlen=2000)

我们再这个类的基础上进行说明,其中的gamma就是我们lambda回报计算的参数lambda,learn_rate就是学习率,也是我们理论公式中的alpha,steps_done意味循环次数,因为我们的目标网络需要从策略网络中定时更新参数,所以我们必须记录一下,避免太久不保存,而policy_net就是策略网络,用于在环境中摸爬滚打。target_net是我们的目标网络,用于我们定时更新参数,

这里我们需要进行epsilon贪心的计算,我们直接定义一个方法choice_action,写在类里:

1
2
3
4
5
6
7
8
9
10
11
def choice_action(self, status, random_try=True):
status_tensor = torch.FloatTensor(status).to(device=self.device)
# 这里加入了一个随机试探的参数,用于前期的数据收集
# epsilon贪心需要逐步下降,越学越烦
self.curious_limit = 0.05 + (self.epsilon - 0.05) * \
math.exp(-1. * self.steps_done / 2000)
if random_try and random.random() < self.curious_limit:
return random.randint(0, self.action_dim - 1) # 随便一个动作回去试
else:
with torch.no_grad():
return torch.argmax(self.policy_net(status_tensor)).item()

这里我们需要对epsilon进行sigmoid下降,避免胆子太大造成学好的内容变成一堆垃圾。这里就按照随机概率进行选择是随机动作还是策略网络输出动作。

为了能看清目标网络的效果,我们再实现一个从目标网络获取动作的方法:

1
2
3
4
def eval_choise(self, status):
status_tensor = torch.FloatTensor(status).to(device=self.device)
with torch.no_grad():
return torch.argmax(self.target_net(status_tensor)).item()

这个方法就不多赘述了,就是非常简单的前向传播。

我们还需要一个向经验池保存数据的办法,我们直接写在类内:

1
2
3
def append_memory(self, s, a, r, next_s, done):
# 当一个元组保存在经验池里
self.memory.append((s, a, r, next_s, done))

然后就是我们关键的update步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def update(self, batch_size):
if len(self.memory) < batch_size:
# 没有通过随机试探学到足够多的知识
return

# 随机采样
random_sample = random.sample(self.memory, batch_size)
# 元组解压,也可以直接用
states, actions, rewards, next_states, dones = zip(*random_sample)

# 类型转换
states = torch.FloatTensor(states).to(device=self.device)
actions = torch.LongTensor(actions).to(device=self.device)
rewards = torch.FloatTensor(rewards).to(device=self.device)
next_states = torch.FloatTensor(next_states).to(device=self.device)
dones = torch.FloatTensor(dones).to(device=self.device)

# lambda回报计算G_t
with torch.no_grad():
target_values = rewards + self.gamma * torch.max(self.target_net(next_states), dim=1)[0] * (1 - dones)

# 利用策略网络估计Q
q_values = self.policy_net(states).gather(1, actions.unsqueeze(1)).squeeze()

# 重要性估计
# 计算行为策略的概率
# behavior_probs = np.full(batch_size, self.curious_limit / self.action_dim) # 默认概率
# 直接使用一维索引更新行为策略的概率
# actions_np = actions.cpu().numpy()
# 将 actions 转换为 NumPy 数组
# behavior_probs[actions_np] = (1 - self.curious_limit) + (self.curious_limit / self.action_dim)

# 重要性向量
# behavior_probs = torch.FloatTensor(behavior_probs).to(device=self.device)
# importance_weights = 1.0 / behavior_probs

# 计算加权损失(结果不尽人意,变成一个大傻逼)
# loss = (importance_weights * (q_values - target_values)).mean()
criterion = nn.SmoothL1Loss()
loss = criterion(q_values, target_values)

# 更新网络
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

self.steps_done += 1
if self.steps_done % 10 == 0:
self.target_net.load_state_dict(self.policy_net.state_dict())

正如注释所说,我也尝试了重要性采样进行梯度计算,但是结果不尽人意,给我们的倒立摆学成了大风车,尽管并没有DQN使用重要性采样的先例,但是我们依然进行了尝试。

这里面的梯度计算就是利用公式所说的:

至于SmoothL1Loss也就是平滑L1损失函数用于计算两者的差(不知道可以看看L1范数的定义,就是矢量的点差)

最后我们写一下main方法,并通过gym进行观察状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import gym
import torch
import time
import DQNModel

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# 主程序
def main():
env = gym.make('CartPole-v1') # 创建 Gym 环境
state_size = env.observation_space.shape[0] # 状态空间大小
action_size = env.action_space.n # 动作空间大小
num_episodes = 40

agent = DQNModel.OffDQNAgent(state_size, action_size, device=device)

# 训练过程
for episode in range(num_episodes):
state, _ = env.reset() # 初始化状态
done = False
total_reward = 0

while not done:
action = agent.choice_action(state, random_try=True)
next_state, reward, done, _, _ = env.step(action) # 执行动作并获取下一个状态和奖励
# 存储经验
agent.append_memory(state, action, reward, next_state, done)
# 更新Q值
agent.update(batch_size=64)
state = next_state
total_reward += reward

print(f"Episode: {episode}, Total Reward: {total_reward}")

env.close() # 关闭一个环境
# 新建一个自己看的环境
env2 = gym.make('CartPole-v1', render_mode="human")
for _ in range(100):
state, _ = env2.reset()
while True:
env2.render()
action = agent.eval_choise(state) # 使用目标策略
next_state, reward, done, _, _ = env2.step(action)
state = next_state
time.sleep(0.1)

env2.close()


if __name__ == "__main__":
main()

正如所见,只需要简单的组合以下就可以,我们仅仅只需要执行40轮,每轮仅需64个数据就完成了收敛,最终实现了一个非常好的效果:

1

我用PR简单的处理了一下,对付看哈。没有上传Gif动图原因是太大了,没法查看。

演说家-评论家网络

所谓的演说家评论家就是AC方法,通过上一篇文章我们知道AC方法是一种策略梯度算法,提及到策略梯度我们可能会有A3C异步演说家方法,TRPO信任区域策略算法以及一些诸如PPO等方法,我们都会逐步实现,既然涉及到了演说家-评论家,我们就必须要有一个Actor网络和Cirttic网络:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 策略梯度(演说家)
class Actor(nn.Module):
def __init__(self, status_dim, action_dim, *args, **kwargs):
super().__init__(*args, **kwargs)
self.forwardConnectNet = nn.Sequential(
nn.Linear(status_dim, 128),
nn.ReLU(),
nn.Linear(128, action_dim),
# AC网络的A输出动作概率
nn.Softmax(dim=-1)
)
pass

def forward(self, x):
return self.forwardConnectNet(x)


# 策略梯度(评论家)
class Critic(nn.Module):
def __init__(self, status_dim, *args, **kwargs):
super().__init__(*args, **kwargs)
self.forwardConnectNet = nn.Sequential(
nn.Linear(status_dim, 128),
nn.ReLU(),
# 批评网络输出Q
nn.Linear(128, 1)
)

def forward(self, x):
return self.forwardConnectNet(x)

策略梯度不是离轨策略,所以为了确保两层网络在数据参数不统一的情况下可以正常反馈并训练网络,我们必须要进行优势函数的计算,也就是理论价值与实际价值的差向量。我们的AC算法是策略梯度,我们就必须求解一下梯度,也就是:

对策略的输出也就是动作概率的向量,我们简单的把Actor输出取一个对数即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Actor_Critic(nn.Module):
def __init__(self, status_dim, action_dim, lr=0.001, device='cuda', *args, **kwargs):
super().__init__(*args, **kwargs)
self.status_dim = status_dim
self.action_dim = action_dim
self.device = device
self.lr = lr
self.actor = Actor(status_dim, action_dim).to(self.device)
self.critic = Critic(status_dim).to(self.device)

self.actor_optim = optim.Adam(params=self.actor.parameters(), lr=lr)
self.critic_optim = optim.Adam(params=self.critic.parameters(), lr=lr)

def choice_action(self, status):
status = torch.FloatTensor(status).unsqueeze(0).to(self.device) # 增加batch维度
probs = self.actor(status)
# 策略梯度系列方法都需要把动作的输出当作动作的概率
action = np.random.choice(len(probs.detach().cpu().numpy()[0]), p=probs.detach().cpu().numpy()[0])
return action

def update(self, status, action, reward, next_status, done):
status = torch.FloatTensor(status).unsqueeze(0).to(self.device)
next_state = torch.FloatTensor(next_status).unsqueeze(0).to(self.device)

# 计算目标值
target_value = reward + (1 - done) * self.critic(next_state)
advantage = target_value - self.critic(status)

# 更新Actor
action_prob = self.actor(status)[0, action]
# 策略梯度方法一定要用到的就是优势函数,或者说是优势向量,与重要性采样的目的是一样的
actor_loss = -torch.log(action_prob) * advantage.detach()

self.actor_optim.zero_grad()
actor_loss.backward()
self.actor_optim.step()

# 更新Critic
critic_loss = (target_value - self.critic(status)).pow(2) # 均方误差
self.critic_optim.zero_grad()
critic_loss.backward()
self.critic_optim.step()

这里的代码比较多,其中我们的AC方法并不需要进行随机试探,我们直接把Actor的输出当作动作向量,按照向量的数值大小当作概率,根据概率选择动作即可。至于如何确认概率,读者可以观察一下Actor网络的最后一层,也就是SoftMAX层。

我们的更新方法处很简单,通过计算目标价值(最终奖励模式即只有游戏结束才会基于奖励),我们给出训练方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train_PG(net, env, num_episodes=400):
for episode in range(num_episodes):
status, _ = env.reset()
total_reward = 0
done = False

while not done:
action = net.choice_action(status)
next_state, reward, done, _, _ = env.step(action)
net.update(status, action, reward, next_state, done)
status = next_state
total_reward += reward

print(f'Episode {episode + 1}, Total Reward: {total_reward}')

我们最后给出在env实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def exampleShow(net, env):
status, _ = env.reset()
while True:
env.render()
action = net.choice_action(status)
next_state, reward, done, _, _ = env.step(action)
net.update(status, action, reward, next_state, done)
status = next_state
time.sleep(0.1)


def main():
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
net = model.Actor_Critic(state_dim, action_dim, device=device)
# 训练完成后直接可视化运行
train_PG(net, env)
env.close()
env2 = gym.make("CartPole-v1", render_mode="human")
exampleShow(net, env2)


if __name__ == "__main__":
main()

2

这个方法需要的轮次比较多才可以非常稳定,当然这是一个概率性问题

近端优化策略

近端优化是AC方法的进一步优化,我们不需要计算优势函数而是选择计算重要性采样:

所以我们只需要计算:

注意我们这里采用了ln函数,原因是我们的Actor输出的概率经过了softmax层,近似于取对数,接下来就给出AC对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 策略梯度
class Actor(nn.Module):
def __init__(self, status_dim, action_dim, *args, **kwargs):
super().__init__(*args, **kwargs)
self.forwardConnectNet = nn.Sequential(
nn.Linear(status_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim),
# AC网络的A输出动作概率
nn.Softmax(dim=-1)
)
pass

def forward(self, x):
return self.forwardConnectNet(x)


# 评论家
class Critic(nn.Module):
def __init__(self, status_dim, *args, **kwargs):
super().__init__(*args, **kwargs)
self.forwardConnectNet = nn.Sequential(
nn.Linear(status_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 1)
)

def forward(self, x):
return self.forwardConnectNet(x)

我们给出PPO1方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class PPO1(nn.Module):
def __init__(self, status_dim, action_dim, lr=0.001, kl_target=3, device=torch.device("cuda"), *args, **kwargs):
super().__init__(*args, **kwargs)
self.device = device
self.lr = lr
self.kl_target = kl_target
self.actor = Actor(status_dim, action_dim).to(self.device)
self.critic = Critic(status_dim).to(self.device)

self.actor_optim = optim.AdamW(params=self.actor.parameters(), lr=lr)
self.critic_optim = optim.AdamW(params=self.critic.parameters(), lr=lr)

def choice_action(self, status):
status = torch.FloatTensor(status).unsqueeze(0).to(self.device)
probs = self.actor(status)
action = np.random.choice(len(probs.detach().cpu().numpy()[0]), p=probs.detach().cpu().numpy()[0])
return action

def update(self, states, actions, rewards, next_states, dones, old_log_probs):
states = torch.FloatTensor(states).to(self.device)
actions = torch.LongTensor(actions).to(self.device)
rewards = torch.FloatTensor(rewards).to(self.device)
next_states = torch.FloatTensor(next_states).to(self.device)
dones = torch.FloatTensor(dones).to(self.device)

# 计算目标值和优势
with torch.no_grad():
target_values = rewards + (1 - dones) * self.critic(next_states)

# 策略梯度
advantages = target_values - self.critic(states)


# 更新Actor
new_probs = self.actor(states)
new_log_probs = new_probs.gather(1, actions.unsqueeze(1)).log()
old_log_probs_tensor = torch.tensor(old_log_probs, device=self.device) # 将列表转换为张量并移动到设备

# 计算重要性采样比率
ratios = (new_log_probs - old_log_probs_tensor).exp() # 比率的计算

# 计算KL散度
kl_divergence = (old_log_probs_tensor - new_log_probs).mean().item()

# 计算损失
actor_loss = -((new_log_probs - old_log_probs_tensor) * advantages * ratios).mean()

if kl_divergence > self.kl_target:
self.actor_optim.zero_grad()
actor_loss.backward()
self.actor_optim.step()

# 更新Critic
critic_loss = (target_values - self.critic(states)).pow(2).mean()
self.critic_optim.zero_grad()
critic_loss.backward()
self.critic_optim.step()

代码很简单,我们再次放入我们之前的函数内,就可以运行得到最终的稳定倒立摆了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def train_ppo(env_name, num_episodes=500, max_timesteps=1024):
env = gym.make(env_name)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
ppo = PPO1(state_dim, action_dim).to(device=device)

for episode in range(num_episodes):
states, actions, rewards, next_states, dones, old_log_probs = [], [], [], [], [], []
state, _ = env.reset()
total_reward = 0

for t in range(max_timesteps):
action = ppo.choice_action(state)
next_state, reward, done, _, _ = env.step(action)

# 记录数据
states.append(state)
actions.append(action)
rewards.append(reward)
next_states.append(next_state)
dones.append(float(done))
old_log_probs.append(
ppo.actor(torch.FloatTensor(state).unsqueeze(0).to(device=device))
.gather(1, torch.LongTensor([[action]]).to(device=device))
.log().item()
)

state = next_state
total_reward += reward

if done:
break

# 更新策略
ppo.update(states, actions, rewards, next_states, dones, old_log_probs)
print(f'Episode {episode + 1}, Total Reward: {total_reward}')
env.close()

env22 = gym.make(env_name, render_mode="human")
exampleShow(ppo, env22)

3

可以看到都可以非常稳定的站立。


总结

很遗憾无模型强化学习内容极多,我们下期博客继续更新


强化学习网络与机器人控制——无模型强化学习
https://blog.minloha.cn/posts/182823f35858a52024102812.html
作者
Minloha
发布于
2024年10月28日
更新于
2024年11月6日
许可协议