QTansformer强化学习网络

SDQN

SDQN原名为Sequential DQN,它可以把原本多个动作的马尔可夫过程转化为一维度的马尔可夫过程,这样可以构建一个马尔可夫Q方法。

  • 当一个动作完成后,通过马尔科夫链执行动作后得到状态$s_{t+1}$,奖励为$r_t$,并复位a

  • 如果没预测完,奖励为0,动作拼接。

在完成上述过程后,可以进行离散处理,利用Q学习方法得到最终的最优解。

训练

在训练过程中定义两个神经网络$Q^U$和$Q^L$,相关损失函数如下:

网络$Q^L$也是根据Q学习方法,可以直接照抄$Q^U$。

仔细观察发现损失函数与均方损失函数是一样的。

$Q^U$使用传统的MLP结构,无需平滑处理。

$Q^L$可以构造成LSTM网络,因为同层之间可以互相传递参数,不过实验表明,不进行参数传递效果更稳定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# SDQN
class SDQN(nn.Module):
def __init__(self, input_size, output_size):
super(SDQN, self).__init__()
self.input_size = input_size
self.output_size = output_size

self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, output_size)

def forward(self, x):
x = x.view(-1, self.input_size)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)

def save(self, path):
torch.save(self.state_dict(), path)

def load(self, path):
self.load_state_dict(torch.load(path))

# 经验回放
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0

def push(self, state, action, reward, next_state, done):
if len(self.memory) < self.capacity:
self.memory.append(None)

self.memory[self.position] = (state, action, reward, next_state, done)
self.position = (self.position + 1) % self.capacity

def sample(self, batch_size):
return random.sample(self.memory, batch_size)

def __len__(self):
return len(self.memory)

# 根据价值选动作
class Agent(object):
def __init__(self, input_size, output_size, memory_size, batch_size, gamma, lr):
self.input_size = input_size
self.output_size = output_size
self.memory = ReplayMemory(memory_size)
self.batch_size = batch_size
self.gamma = gamma
self.lr = lr

self.model = SDQN(input_size, output_size)
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)

def select_action(self, state):
state = torch.from_numpy(state).float().unsqueeze(0)
q_values = self.model(state)
action = q_values.max(1)[1].item()
return action

def optimize(self):
if len(self.memory) < self.batch_size:
return

transitions = self.memory.sample(self.batch_size)
batch = Transition(*zip(*transitions))

state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
next_state_batch = torch.cat(batch.next_state)

state_action_values = self.model(state_batch).gather(1, action_batch)
next_state_values = self.model(next_state_batch).max(1)[0].detach()
expected_state_action_values = (next_state_values * self.gamma) + reward_batch

loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

def save(self, path):
self.model.save(path)

def load(self, path):
self.model.load(path)
pass

def push(self, state, action, reward, next_state, done):
self.memory.push(state, action, reward, next_state, done)

def __len__(self):
return len(self.memory)

def __repr__(self):
return self.model.__repr__()

# 评估
import gym
import random
from collections import namedtuple

Transition = namedtuple("Transition", ("state", "action", "reward", "next_state", "done"))

# 训练
def train(env, agent, num_episodes, max_steps):
for episode in range(num_episodes):
state = env.reset()
for step in range(max_steps):
action = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
agent.push(state, action, reward, next_state, done)
agent.optimize()
state = next_state
if done:
break
print("Episode: {}, Step: {}".format(episode, step))
env.close()

# 测试集
def test(env, agent, num_episodes, max_steps):
for episode in range(num_episodes):
state = env.reset()
for step in range(max_steps):
action = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
state = next_state
if done:
break
print("Episode: {}, Step: {}".format(episode, step))
env.close()

if __name__ == "__main__":
# Parameters
env_name = "CartPole-v0"
num_episodes = 1000
max_steps = 1000
memory_size = 10000
batch_size = 128
gamma = 0.99
lr = 1e-3
save_path = "model.pt"

# Environment
env = gym.make(env_name)
input_size = env.observation_space.shape[0]
output_size = env.action_space.n

# Agent
agent = Agent(input_size, output_size, memory_size, batch_size, gamma, lr)

# Train
train(env, agent, num_episodes, max_steps)

# Save
agent.save(save_path)

# Test
agent.load(save_path)
test(env, agent, num_episodes, max_steps)

QTransformer

QTransformer翻译为:强化自回归Q学习,这是一种对人类的模仿学习方法,相关流程如下:

1

在时间步t上更新每个动作的价值,对于给定历史状态,我们更新所有动作状态所有价值。价值通过Bellman更新(绿框)进行训练。未知动作在红色框归零。除最后一个操作维度外,所有操作维度的价值目标都是在同一时间步内使用下一个操作维度的最大化来计算的。最后一个动作维度的Q-target是使用蒙特卡洛最大化来计算的。

下式为Bellman算子,通过蒙特卡洛变形而来。

2

网络结构如图,结构在奖励方面进行改进,采用乐单纯的0和1进行奖励,同时为了避免Q函数学习过拟合,也采用了相应的乘法策略。对于不属于的行为a,下推Q值使它在合理的范围内。

QTransformer使用了Transformer模型进行q学习,主要有三个部分:

  • 使用离散化和自回归使Transformer能够完成TD学习。
  • q函数正则器
  • 蒙特卡罗和N步回归

不会强化学习基础的可以阅读我早期博客:强化学习基础

我们定义$\tau$为样本集,我们通过下面的公式进行更新:

3

我们发现对于奖励只适用于最后一个维度(等式中的第二行),所以在执行动作前没有任何奖励和价值。此外,我们只在时间区间对q值进行折现,并在每个时间区间的最后将除最后一个维度外的所有维度的$\gamma$保持在1.0,以确保与蒙特卡洛中的折现相同。

我们的训练目标是让一个函数的值变小,这个函数是提出的保守Q学习函数,如下:

4

函数包括TD的价值部分和保守Q学习的折扣,这样可以获得尽可能小的价值,同时提高了正则化的强度吗,调节折扣系数$\alpha$改变保守的强度。最终我们收敛了Q函数

项目地址:https://q-transformer.github.io/


QTansformer强化学习网络
https://blog.minloha.cn/posts/13291113bcc7f42023092907.html
作者
Minloha
发布于
2023年9月29日
更新于
2024年9月15日
许可协议