强化学习网络与机器人控制——多维学习

说明:好久没更新了,如果下半年比较空闲的话会继续更新,如果没空的话还是半年更新(

QR-DQN

所谓的QR-DQN,其实关键在QR,他的全名叫做分位数回归强化学习。提及回归可能想到一些线性回归,在MP网络中其实就是一种类似的线性回归模型。广义最小二乘法也是无偏估计,但是岭回归或Lasso是牺牲均值降低方差,是有偏估计。对于分位数回归,是对特定的分位数进行无偏估计。一般我们回归都是衡量均值是否与目标一样,但是QR方法不关注均值,更多是针对分位数(如中位数或四分位数进行无偏估计),同时可以分析自变量如何影响函数分位数。

按照文献叙述的一样,这个网络模型并非进行价值函数本身的学习,而是进行价值的分布函数学习。我们都知道价值的获取是与状态和环节交互有关,当以概率进行采样时,包括状态转移、奖励以及动作都会出现一定的随机性。而分布式学习一般都是采用无偏估计方法去学习价值分布,本模型则选择了一种分位数无偏估计进行学习。

这个模型在C51的基础上进行优化,针对N个概率分布改成可学习的分位点。

在文献中,作者首先引入了分位数投影的概念,他说在函数逼近的过程中,贝尔曼方程可能出现发散的情况,因此作者提出了分位数投影的方法,用于约束贝尔曼函数使得其单调递增。

简单描写成式子中的样子,Z为目标的分位数(N分位),利用贝尔曼去迭代此分位数,然后需要进行排序,将分位数以大小升序排列,保证单调性,如果出现多个相同的分位数需要计算平均值,以减少误差,最终误差的计算如下:

其中的$\rho$为分位数损失函数,论文中选择应用Huber损失,这个函数提出时间比较早,下面是具体格式:

而$\rho$可以写作:

最终可以进行参数更新:

代码实现如下,首先假定一个全连接网络为Q网络:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class QRDQN(nn.Module):
def __init__(self, state_dim, action_dim, num_quantiles=200):
super(QRDQN, self).__init__()
self.state_dim = state_dim
self.action_dim = action_dim
self.num_quantiles = num_quantiles

# 网络结构
self.fc1 = nn.Linear(state_dim, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, action_dim * num_quantiles) # 输出每个动作的分位数

def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
x = x.view(-1, self.action_dim, self.num_quantiles) # 重塑为 (batch_size, action_dim, num_quantiles)
return x

然后定义一个Agent用于训练和获取动作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class QRDQNAgent:
def __init__(self, state_dim, action_dim, num_quantiles=200, gamma=0.99, lr=0.001, epsilon=1.0, epsilon_min=0.01,
epsilon_decay=0.995):
self.state_dim = state_dim
self.action_dim = action_dim
self.num_quantiles = num_quantiles
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_min = epsilon_min
self.epsilon_decay = epsilon_decay

# 网络和优化器
self.model = QRDQN(state_dim, action_dim, num_quantiles)
self.target_model = QRDQN(state_dim, action_dim, num_quantiles)
self.target_model.load_state_dict(self.model.state_dict())
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)

# 经验回放池
self.replay_buffer = deque(maxlen=10000)

def get_action(self, state):
if random.random() < self.epsilon:
return random.randint(0, self.action_dim - 1) # 随机探索
else:
with torch.no_grad():
quantiles = self.model(torch.FloatTensor(state))
q_values = quantiles.mean(dim=2) # 计算 Q 值的期望
return torch.argmax(q_values).item()

def update_target_model(self):
self.target_model.load_state_dict(self.model.state_dict())

def train(self, batch_size=64):
if len(self.replay_buffer) < batch_size:
return

# 从经验回放池中采样
batch = random.sample(self.replay_buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)

states = torch.FloatTensor(np.array(states))
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(np.array(next_states))
dones = torch.FloatTensor(dones)

# 计算目标分位数
with torch.no_grad():
next_quantiles = self.target_model(next_states)
next_q_values = next_quantiles.mean(dim=2)
next_actions = torch.argmax(next_q_values, dim=1)
next_quantiles = next_quantiles[range(batch_size), next_actions]
target_quantiles = rewards.unsqueeze(1) + self.gamma * (1 - dones.unsqueeze(1)) * next_quantiles

# 计算当前分位数
current_quantiles = self.model(states)
current_quantiles = current_quantiles[range(batch_size), actions]

# 计算分位数损失(Huber 损失)
tau = torch.linspace(0.0, 1.0, self.num_quantiles + 1)[:-1] # 长度为 num_quantiles
diff = target_quantiles.t().unsqueeze(-1) - current_quantiles.unsqueeze(0) # 计算差值
huber_loss = F.smooth_l1_loss(diff, torch.zeros_like(diff), reduction="none") # Huber 损失
quantile_loss = (tau - (diff.detach() < 0).float()).abs() * huber_loss # 分位数损失
quantile_loss = quantile_loss.mean()

# 更新网络
self.optimizer.zero_grad()
quantile_loss.backward()
self.optimizer.step()

# 衰减探索率
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

def test(self, env, num_episodes=10):
for episode in range(num_episodes):
state, _ = env.reset()
done = False
episode_reward = 0
while not done:
action = self.get_action(state)
next_state, reward, done, _, _ = env.step(action)
episode_reward += reward
state = next_state

# 可视化环境
env.render()

print(f"Test Episode {episode + 1}, Reward: {episode_reward}")

最终写一个main方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def main():
env = gym.make('CartPole-v0', render_mode="human") # 设置为 human 模式以可视化
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

# 创建 QR-DQN 智能体
agent = QRDQNAgent(state_dim, action_dim)

# 训练
num_episodes = 1000
for episode in range(num_episodes):
state, _ = env.reset()
done = False
episode_reward = 0
while not done:
action = agent.get_action(state)
next_state, reward, done, _, _ = env.step(action)
agent.replay_buffer.append((state, action, reward, next_state, done))
episode_reward += reward
state = next_state

# 训练模型
agent.train()

# 更新目标网络
if episode % 10 == 0:
agent.update_target_model()

print(f"Episode {episode + 1}, Reward: {episode_reward}, Epsilon: {agent.epsilon:.4f}")

# 测试
agent.test(env, num_episodes=10)


if __name__ == "__main__":
main()

稍微运行一会就会发现强化体可以长时间站立了:

1
2
3
4
5
6
7
8
9
Episode 53, Reward: 158.0, Epsilon: 0.0100
Episode 54, Reward: 293.0, Epsilon: 0.0100
Episode 55, Reward: 211.0, Epsilon: 0.0100
Episode 56, Reward: 124.0, Epsilon: 0.0100
Episode 57, Reward: 223.0, Epsilon: 0.0100
Episode 58, Reward: 114.0, Epsilon: 0.0100
Episode 59, Reward: 167.0, Epsilon: 0.0100
Episode 60, Reward: 157.0, Epsilon: 0.0100
Episode 61, Reward: 210.0, Epsilon: 0.0100

1

随便截一个图,可以自己运行一下看看效果的。

ICM

在现实环境中,我们在设定奖励的时候一般都会指定一些状态用于给予奖励,比如一个非连续过程,通常都会以抵达目标给予一定奖励,但是在探索过程中并不存在或难以设定奖励,智能体都是以概率进行随机探索,这导致智能体有可能需要很久才会收敛(或者无法收敛),而现实环境都是连续状态空间,经常出现难以设定的奖励函数,为此需要基于一定的连续化奖励的方法,好奇心模块可以解决这个问题。

ICM中文名为内在好奇心模块(Intrinsic Curiosity Module),在一般的强化模型上进行状态推导得到下一状态的最优价值,整体流程如图所示:

2

ICM从图可以看出包含了两个部分,分别为动作的预测(逆模型$\hat a_t$)和状态编码器的更新(前向模型$\phi$),这两个步骤均由模型完成,可以简单写作:

更新方法也非常简单,正如模块图所画,他会拿到相邻时刻的状态和当前的动作,写作:

而奖励也是一样的:

难度不大,直接给出代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque

# 以Q网络当作学习方法
class QNetwork(nn.Module):
def __init__(self, state_dim, action_dim):
super(QNetwork, self).__init__()
self.fc = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)

def forward(self, x):
return self.fc(x)


class ICM(nn.Module):
def __init__(self, state_dim, action_dim, feature_dim=128):
super(ICM, self).__init__()
self.feature = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, feature_dim)
)
# 逆模型(简单用一个全连接代替了)
self.inverse_model = nn.Sequential(
nn.Linear(feature_dim * 2, 256),
nn.ReLU(),
nn.Linear(256, action_dim)
)
# 正向模型(一样的全连接)
self.forward_model = nn.Sequential(
nn.Linear(feature_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, feature_dim)
)

def forward(self, state, next_state, action_onehot):
phi_s = self.feature(state)
phi_next = self.feature(next_state)

pred_action = self.inverse_model(torch.cat([phi_s, phi_next], dim=1))
pred_phi_next = self.forward_model(torch.cat([phi_s, action_onehot], dim=1))

return phi_s, phi_next, pred_action, pred_phi_next


class DQNAgent:
def __init__(self, state_dim, action_dim, device="cuda"):
self.device = torch.device(device if torch.cuda.is_available() else "cpu")

self.state_dim = state_dim
self.action_dim = action_dim
self.q_net = QNetwork(state_dim, action_dim).to(self.device)
self.target_q_net = QNetwork(state_dim, action_dim).to(self.device)
self.target_q_net.load_state_dict(self.q_net.state_dict())

self.icm = ICM(state_dim, action_dim).to(self.device)

self.q_optimizer = optim.Adam(self.q_net.parameters(), lr=1e-3)
self.icm_optimizer = optim.Adam(self.icm.parameters(), lr=1e-3)

self.memory = deque(maxlen=100000)
self.batch_size = 64
self.gamma = 0.99
self.beta = 0.2 # forward loss weight in ICM

self.epsilon = 1.0
self.epsilon_min = 0.05
self.epsilon_decay = 0.995

def select_action(self, state):
if random.random() < self.epsilon:
return random.randint(0, self.action_dim - 1)
state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
with torch.no_grad():
return self.q_net(state).argmax(dim=1).item()

def store(self, transition):
self.memory.append(transition)

def train(self):
if len(self.memory) < self.batch_size:
return

batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)

states = torch.FloatTensor(np.array(states)).to(self.device)
next_states = torch.FloatTensor(np.array(next_states)).to(self.device)
actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)

# ----------- DQN Loss -----------
q_values = self.q_net(states).gather(1, actions)
with torch.no_grad():
max_next_q = self.target_q_net(next_states).max(1, keepdim=True)[0]
target_q = rewards + (1 - dones) * self.gamma * max_next_q
q_loss = F.mse_loss(q_values, target_q)

self.q_optimizer.zero_grad()
q_loss.backward()
self.q_optimizer.step()

# ----------- ICM Loss -----------
action_onehot = F.one_hot(actions.squeeze(1), self.action_dim).float().to(self.device)
phi_s, phi_next_s, pred_action, pred_phi_next = self.icm(states, next_states, action_onehot)

forward_loss = F.mse_loss(pred_phi_next, phi_next_s.detach())
inverse_loss = F.cross_entropy(pred_action, actions.squeeze(1))
icm_loss = (1 - self.beta) * inverse_loss + self.beta * forward_loss

self.icm_optimizer.zero_grad()
icm_loss.backward()
self.icm_optimizer.step()

# epsilon update
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay

def update_target(self):
self.target_q_net.load_state_dict(self.q_net.state_dict())

main方法也很好写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import gym
from model import DQNAgent
import time


def main():
env = gym.make("CartPole-v1", render_mode="human")
agent = DQNAgent(state_dim=env.observation_space.shape[0],
action_dim=env.action_space.n,
device="cuda")

episodes = 500

for episode in range(episodes):
state, _ = env.reset()
total_reward = 0
done = False

while not done:
action = agent.select_action(state)
next_state, reward, done, _, _ = env.step(action)

agent.store((state, action, reward, next_state, done))
agent.train()

state = next_state
total_reward += reward

agent.update_target()
print(f"Episode {episode}: Reward = {total_reward:.2f}, Epsilon = {agent.epsilon:.3f}")

env.close()


if __name__ == "__main__":
main()

最终的运行效果如图所示:

1
2
3
4
5
6
7
Episode 18: Reward = 30.00, Epsilon = 0.253
Episode 19: Reward = 108.00, Epsilon = 0.147
Episode 20: Reward = 162.00, Epsilon = 0.065
Episode 21: Reward = 193.00, Epsilon = 0.050
Episode 22: Reward = 104.00, Epsilon = 0.050
Episode 23: Reward = 160.00, Epsilon = 0.050
Episode 24: Reward = 211.00, Epsilon = 0.050

3

TD3

在上一期博客,我们复现了DDPG方法,针对连续状态空间,其方法需要考虑到连续性,与Q表格学习的方法不同(Q表格方法的特点就是状态不连续、Q表格离散),DDPG利用了AC框架和函数近似发,利用网络拟合状态-动作的Q函数和策略函数。而TD3其实是DDPG的改进方法,它叫做Twin Delayed DDPG(TD3),因此两者有部分相似处:

特性DDPGTD3
基础框架Actor-Critic,确定性策略梯度DDPG的改进版,保留Actor-Critic框架
核心改进首个解决连续控制的深度RL算法针对DDPG的三个关键缺陷提出改进:高估偏差、策略更新频率、方差问题
策略类型确定性策略输出动作确定性策略,有通过延迟更新和噪声缓解过拟合
目标网络更新软更新双重Critic网络 + 延迟更新

DDPG使用了单个Critic网络,导致在稀疏奖励的情况下容易过高估计Q值,从而学习到虚假的动作,TD3则增加一个Critic网络,取最小值作为Q值,然后利用贝尔曼递推更新:

同时TD3增加了噪声裁剪(DDPG使用的OU噪声被替换成带裁剪的高斯噪声),也就是式子中的$\epsilon$:

经过实践发现TD3的效果确实优于DDPG,下面是代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np


# Actor 网络
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super().__init__()
self.l1 = nn.Linear(state_dim, 400)
self.l2 = nn.Linear(400, 300)
self.l3 = nn.Linear(300, action_dim)
self.max_action = max_action

def forward(self, state):
a = torch.relu(self.l1(state))
a = torch.relu(self.l2(a))
return self.max_action * torch.tanh(self.l3(a))


# Critic 网络(俩Q)
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
# Q1
self.q1_l1 = nn.Linear(state_dim + action_dim, 400)
self.q1_l2 = nn.Linear(400, 300)
self.q1_l3 = nn.Linear(300, 1)
# Q2
self.q2_l1 = nn.Linear(state_dim + action_dim, 400)
self.q2_l2 = nn.Linear(400, 300)
self.q2_l3 = nn.Linear(300, 1)

def forward(self, state, action):
sa = torch.cat([state, action], dim=1)

q1 = torch.relu(self.q1_l1(sa))
q1 = torch.relu(self.q1_l2(q1))
q1 = self.q1_l3(q1)

q2 = torch.relu(self.q2_l1(sa))
q2 = torch.relu(self.q2_l2(q2))
q2 = self.q2_l3(q2)

return q1, q2

def Q1(self, state, action):
sa = torch.cat([state, action], dim=1)
q1 = torch.relu(self.q1_l1(sa))
q1 = torch.relu(self.q1_l2(q1))
return self.q1_l3(q1)


# TD3 封装
class TD3:
def __init__(
self,
state_dim,
action_dim,
max_action,
policy_noise=0.2,
noise_clip=0.5,
policy_freq=2,
tau=0.005,
gamma=0.99,
actor_lr=3e-4,
critic_lr=3e-4
):
self.actor = Actor(state_dim, action_dim, max_action).to(
torch.device("cuda" if torch.cuda.is_available() else "cpu"))
self.actor_target = Actor(state_dim, action_dim, max_action).to(
torch.device("cuda" if torch.cuda.is_available() else "cpu"))
self.actor_target.load_state_dict(self.actor.state_dict())

self.critic = Critic(state_dim, action_dim).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
self.critic_target = Critic(state_dim, action_dim).to(
torch.device("cuda" if torch.cuda.is_available() else "cpu"))
self.critic_target.load_state_dict(self.critic.state_dict())

self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)

self.max_action = max_action
self.policy_noise = policy_noise
self.noise_clip = noise_clip
self.policy_freq = policy_freq
self.tau = tau
self.gamma = gamma
self.total_it = 0
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def select_action(self, state):
state = torch.FloatTensor(state.reshape(1, -1)).to(self.device)
return self.actor(state).cpu().data.numpy().flatten()

def train(self, replay_buffer, batch_size):
self.total_it += 1

state, action, reward, next_state, done = replay_buffer.sample(batch_size)

# ---------- 目标策略的更新 ----------
with torch.no_grad():
noise = (
torch.randn_like(action) * self.policy_noise
).clamp(-self.noise_clip, self.noise_clip)
next_action = (
self.actor_target(next_state) + noise
).clamp(-self.max_action, self.max_action)

# Compute target Q
target_Q1, target_Q2 = self.critic_target(next_state, next_action)
target_Q = torch.min(target_Q1, target_Q2)
target_Q = reward + (1.0 - done) * self.gamma * target_Q

# 获取目标Q(注意俩Q)
current_Q1, current_Q2 = self.critic(state, action)

critic_loss = nn.MSELoss()(current_Q1, target_Q) + nn.MSELoss()(current_Q2, target_Q)

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

# ---------- 延迟策略更新 ----------
if self.total_it % self.policy_freq == 0:
actor_loss = -self.critic.Q1(state, self.actor(state)).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

# ---------- Polyak均值更新 ----------
for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

main方法只需要随便一个连续空间即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import gym
import numpy as np
import torch
from collections import deque
from model import TD3
import random


# 经验回放器
class ReplayBuffer:
def __init__(self, max_size=int(1e6)):
self.buffer = deque(maxlen=max_size)

def add(self, state, action, reward, next_state, done):
self.buffer.append((
np.array(state, dtype=np.float32).flatten(),
np.array(action, dtype=np.float32).flatten(),
float(reward),
np.array(next_state, dtype=np.float32).flatten(),
float(done)
))

def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = map(np.stack, zip(*batch))
return (
torch.FloatTensor(state).to(device),
torch.FloatTensor(action).to(device),
torch.FloatTensor(reward).unsqueeze(1).to(device),
torch.FloatTensor(next_state).to(device),
torch.FloatTensor(done).unsqueeze(1).to(device)
)

def size(self):
return len(self.buffer)


# 设备选择
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if __name__ == "__main__":
env = gym.make("Pendulum-v1", render_mode="human")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])

agent = TD3(state_dim, action_dim, max_action)
replay_buffer = ReplayBuffer()

max_timesteps = 500000
start_timesteps = 10000
batch_size = 256
expl_noise = 0.1

episode_timesteps = 0
episode_reward = 0
episode_num = 0
state, _ = env.reset()
state = np.array(state, dtype=np.float32).flatten()

for t in range(int(max_timesteps)):
episode_timesteps += 1

if t < start_timesteps:
action = env.action_space.sample()
else:
action = (
agent.select_action(state) + np.random.normal(0, expl_noise, size=action_dim)
).clip(env.action_space.low, env.action_space.high)

next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
next_state = np.array(next_state, dtype=np.float32).flatten()

done_bool = float(done) if episode_timesteps < env._max_episode_steps else 0
replay_buffer.add(state, action, reward, next_state, done_bool)

state = next_state
episode_reward += reward

if episode_timesteps >= env._max_episode_steps:
print(f"Episode {episode_num} | Steps: {episode_timesteps} | Reward: {episode_reward:.2f}")
state, _ = env.reset()
state = np.array(state, dtype=np.float32).flatten()
episode_timesteps = 0
episode_reward = 0
episode_num += 1

if replay_buffer.size() >= batch_size:
agent.train(replay_buffer, batch_size)

最终输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Episode 0 | Steps: 200 | Reward: -787.64
Episode 1 | Steps: 200 | Reward: -1170.20
Episode 2 | Steps: 200 | Reward: -763.55
Episode 3 | Steps: 200 | Reward: -968.67
Episode 4 | Steps: 200 | Reward: -1166.24
...
Episode 61 | Steps: 200 | Reward: -236.65
Episode 62 | Steps: 200 | Reward: -231.83
Episode 63 | Steps: 200 | Reward: -120.32
Episode 64 | Steps: 200 | Reward: -126.92
Episode 65 | Steps: 200 | Reward: -2.13
Episode 66 | Steps: 200 | Reward: -238.26
Episode 67 | Steps: 200 | Reward: -127.60
Episode 68 | Steps: 200 | Reward: -117.58
Episode 69 | Steps: 200 | Reward: -2.06
Episode 70 | Steps: 200 | Reward: -126.09
Episode 71 | Steps: 200 | Reward: -116.53
Episode 72 | Steps: 200 | Reward: -124.41
Episode 73 | Steps: 200 | Reward: -127.44

运行效果如下图:

4

参考文献

论文地址:

QR-DQN:https://arxiv.org/abs/1710.10044

ICM:https://arxiv.org/abs/1705.05363

TD3:https://arxiv.org/abs/1802.09477

到此强化学习部分暂时不会继续写了,除非什么时候雅兴大发才会继续弄。


强化学习网络与机器人控制——多维学习
https://blog.minloha.cn/posts/192518f1abb13d2025062557.html
作者
Minloha
发布于
2025年6月25日
更新于
2025年6月25日
许可协议