强化学习网络与机器人控制——探索学习

RainBow

这个网络是一个比较新的模型,他将几乎所有的无模型控制模型都融合在了一起,首先结合了DQN实现了双策略探索方案:

1、双重DQN

论文中给出了双重策略进行探索,通过两个策略的操作效果最大化实现损失计算。

1

2、优先经验回放

经验池提供了随机经验池进行经验回放,我们希望学习可以有先后的进行采样学习,这里是论文中给出的TD误差

2

3、类似AC网络

RainBow实现了类似AC网络的结构,采用了即时网络和优势网络,他们共享同一个卷积层:

3

4、噪声网络

这也是Rainbow的特色所在,他抛弃了Linear层而是噪声层,参数传播给定了噪声参数:

4

通过上述的所有内容进行组合,我们就得到了RainBow模型

接下来给出代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 前向噪音网络
class NoisyLinear(nn.Module):
def __init__(self, in_features, out_features, std_init=0.5, device=torch.device("cuda")):
super(NoisyLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.std_init = std_init
self.device = device
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features)).to(device)
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features)).to(device)
self.register_buffer('weight_epsilon', torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features)).to(device)
self.bias_sigma = nn.Parameter(torch.empty(out_features)).to(device)
self.register_buffer('bias_epsilon', torch.empty(out_features))
self.reset_parameters()
self.reset_noise()

def reset_parameters(self):
mu_range = 1 / math.sqrt(self.in_features)
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.std_init / math.sqrt(self.in_features))
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.std_init / math.sqrt(self.out_features))

def _scale_noise(self, size):
x = torch.randn(size, device=self.weight_mu.device)
return x.sign().mul_(x.abs().sqrt_())

def reset_noise(self):
epsilon_in = self._scale_noise(self.in_features)
epsilon_out = self._scale_noise(self.out_features)
self.weight_epsilon.copy_(epsilon_out.ger(epsilon_in))
self.bias_epsilon.copy_(epsilon_out)

def forward(self, input):
if input.dim() > 2:
input = input.view(-1, 4)

input_cuda = input.to(self.device)

if self.training:
return F.linear(input_cuda, self.weight_mu + self.weight_sigma * self.weight_epsilon,
self.bias_mu + self.bias_sigma * self.bias_epsilon)
else:
return F.linear(input_cuda, self.weight_mu, self.bias_mu)

噪声网络需要给定高斯均值和方差,通过此法可以对w和b进行噪声化,使其引入非线性噪声。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# DQN层
class DQN(nn.Module):
def __init__(self, args, action_space):
super(DQN, self).__init__()
self.atoms = args.atoms
self.action_space = action_space

# 共享卷积层
if args.architecture == 'canonical':
self.convs = nn.Sequential(nn.Conv2d(args.history_length, 32, 8, stride=4, padding=0), nn.ReLU(),
nn.Conv2d(32, 64, 4, stride=2, padding=0), nn.ReLU(),
nn.Conv2d(64, 64, 3, stride=1, padding=0), nn.ReLU())
self.conv_output_size = 3136
# 不同的共享方式
elif args.architecture == 'data-efficient':
self.convs = nn.Sequential(nn.Conv2d(args.history_length, 32, 5, stride=5, padding=0), nn.ReLU(),
nn.Conv2d(32, 64, 5, stride=5, padding=0), nn.ReLU())
self.conv_output_size = 576

self.fc_h_v = NoisyLinear(self.conv_output_size, args.hidden_size, std_init=args.noisy_std)
self.fc_h_a = NoisyLinear(self.conv_output_size, args.hidden_size, std_init=args.noisy_std)
self.fc_z_v = NoisyLinear(args.hidden_size, self.atoms, std_init=args.noisy_std)
self.fc_z_a = NoisyLinear(args.hidden_size, action_space * self.atoms, std_init=args.noisy_std)

def forward(self, x, log=False):
x = self.convs(x)
x = x.view(-1, self.conv_output_size)
v = self.fc_z_v(F.relu(self.fc_h_v(x))) # Value stream
a = self.fc_z_a(F.relu(self.fc_h_a(x))) # Advantage stream
v, a = v.view(-1, 1, self.atoms), a.view(-1, self.action_space, self.atoms)
q = v + a - a.mean(1, keepdim=True) # Combine streams
if log: # Use log softmax for numerical stability
q = F.log_softmax(q, dim=2) # Log probabilities with action over second dimension
else:
q = F.softmax(q, dim=2) # Probabilities with action over second dimension
return q

def reset_noise(self):
for name, module in self.named_children():
if 'fc' in name:
module.reset_noise()

这里的前向传播进行了概率化,这样可以在学习时引入更多内容。为了适应我们的网络,我删除了共享卷积层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# DQN
class DQN(nn.Module):
def __init__(self, status_space, action_space, atoms=32, hidden_size=128, noisy_std=0.5):
super(DQN, self).__init__()
self.atoms = atoms
self.action_space = action_space

# history_length可以理解为经验池的长度, RainBow是离轨策略,
# 我们需要从中提取出庞大的信息流, 所以一个经验网络很重要,可以去了解随机经验池的实现
'''
self.conv = nn.Sequential(nn.Conv2d(history_length, 32, 3, stride=5, padding=0), nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=5, padding=0), nn.ReLU())
self.conv_output_size = 576
'''

self.fc_h_v = NoisyLinear(status_space, hidden_size, std_init=noisy_std)
self.fc_z_v = NoisyLinear(hidden_size, self.atoms, std_init=noisy_std)

self.fc_h_a = NoisyLinear(status_space, hidden_size, std_init=noisy_std)
self.fc_z_a = NoisyLinear(hidden_size, action_space * self.atoms, std_init=noisy_std)

def forward(self, x, log=False):
# x = self.conv(x)
# x = x.view(-1, self.action_space)
# Value stream
v = self.fc_z_v(F.relu(self.fc_h_v(x)))
# Advantage stream
a = self.fc_z_a(F.relu(self.fc_h_a(x)))
v, a = v.view(-1, 1, self.atoms), a.view(-1, self.action_space, self.atoms)
# Combine streams
q = v + a - a.mean(1, keepdim=True)
# Use log softmax for numerical stability
if log:
# Log probabilities with action over second dimension
q = F.log_softmax(q, dim=2)
else:
# Probabilities with action over second dimension
q = F.softmax(q, dim=2)
return q

def reset_noise(self):
for name, module in self.named_children():
if 'fc' in name:
module.reset_noise()

最后我们给出RainBow组合的网络:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# Rainbow模型
class RainBow(nn.Module):
def __init__(self, status_space, action_space, V_min=-10, epsilon=1.5e-4, lr=0.001, V_max=10, atoms=32,
device=torch.device("cuda"), batch_size=32, discount=0.99, norm_clip=0.3, *args, **kwargs):
super().__init__(*args, **kwargs)
self.Vmin = V_min
self.Vmax = V_max
self.atoms = atoms
self.offset_net = DQN(status_space, action_space)
self.online_net = DQN(status_space, action_space)
self.support = torch.linspace(V_min, V_max, atoms).to(device=device) # Support (range) of z
self.delta_z = (V_max - V_min) / (atoms - 1)
self.norm_clip = norm_clip
self.batch_size = batch_size
self.discount = discount
self.lr = lr
self.device = device
self.epsilon = epsilon

def reset_noise(self):
self.online_net.reset_noise()
self.offset_net.reset_noise()

def act(self, state):
state = torch.Tensor(state).view(4, 1)
with torch.no_grad():
return (self.online_net(state.unsqueeze(0)) * self.support).sum(2).argmax(1).item()

def act_e_greedy(self, state, epsilon=0.001): # 贪心策略
return np.random.randint(0, self.action_space) if np.random.random() < epsilon else self.act(state)

def learn(self, mem):
# Sample transitions
idxs, states, actions, returns, next_states, nonterminals, weights = mem.sample(self.batch_size)

# Calculate current state probabilities (online network noise already sampled)
log_ps = self.online_net(states, log=True) # Log probabilities log p(s_t, ·; θonline)
log_ps_a = log_ps[range(self.batch_size), actions] # log p(s_t, a_t; θonline)

with torch.no_grad():
# 计算下一个状态的概率
pns = self.online_net(next_states) # Probabilities p(s_t+n, ·; θonline)
dns = self.support.expand_as(pns) * pns # Distribution d_t+n = (z, p(s_t+n, ·; θonline))
dns = dns.permute(2, 0, 1) # Shape: (batch_size, num_actions, atoms)
argmax_indices_ns = dns.sum(2).argmax(1) # Shape: (batch_size,)

# Perform argmax action selection using online network: argmax_a[(z, p(s_t+n, a; θonline))]
self.offset_net.reset_noise() # Sample new target net noise
pns = self.offset_net(next_states) # Probabilities p(s_t+n, ·; θtarget)

# Correct batch_indices1
batch_indices = torch.arange(self.batch_size) # Shape: (batch_size,)

# Use argmax_indices_ns to index actions, batch_indices for batch alignment
pns_a = pns.view(-1, 2, 32)[batch_indices, argmax_indices_ns]

# Transpose pns_a to align with expected shape (batch_size, atoms)

# Double-Q probabilities p(s_t+n, argmax_a[(z, p(s_t+n, a; θonline))]; θtarget)

# Compute Tz (Bellman operator T applied to z)

# print(f'Tz values: {nonterminals.shape}')
# print(f'nonterminals {nonterminals.shape}')
# print(f'self.support {self.support.shape}')

Tz = returns.unsqueeze(1) + nonterminals * (self.discount ** 3) * self.support.unsqueeze(0)

# Tz = R^n + (γ^n)z (accounting for terminal states)
Tz = Tz.clamp(min=self.Vmin, max=self.Vmax) # Clamp between supported values
# Compute L2 projection of Tz onto fixed support z
b = (Tz - self.Vmin) / self.delta_z # b = (Tz - Vmin) / Δz
# Ensure `b` is a floating-point tensor
b = b.float().to(device=self.device)

l = b.floor().to(torch.int64)
u = b.ceil().to(torch.int64)

# Fix disappearing probability mass when l = b = u (b is int)
l[(u > 0) * (l == u)] -= 1
u[(l < (self.atoms - 1)) * (l == u)] += 1

# Distribute probability of Tz
# Compute m
m = states.new_zeros(self.batch_size, self.atoms)
offset = torch.linspace(0, (self.batch_size - 1) * self.atoms, self.batch_size).unsqueeze(1).expand(
self.batch_size, self.atoms).to(actions)

m.view(-1).index_add_(0, (l + offset).view(-1),
(pns_a * (u.float() - b)).view(-1)) # m_l = m_l + p(s_t+n, a*)(u - b)
m.view(-1).index_add_(0, (u + offset).view(-1),
(pns_a * (b - l.float())).view(-1)) # m_u = m_u + p(s_t+n, a*)(b - l)

loss = -torch.sum(m * log_ps_a, 1) # Cross-entropy loss (minimises DKL(m||p(s_t, a_t)))
print(f'loss: {loss.numpy()}')

self.online_net.zero_grad()
(weights * loss).mean().backward() # Backpropagate importance-weighted minibatch loss
clip_grad_norm_(self.online_net.parameters(), self.norm_clip) # Clip gradients by L2 norm
# self.optimiser.step()

mem.update_priorities(idxs, loss.detach().cpu().numpy()) # Update priorities of sampled transitions

def evaluate_q(self, state):
with torch.no_grad():
return (self.online_net(state.unsqueeze(0)) * self.support).sum(2).max(1)[0].item()

def train(self):
self.online_net.train()

def eval(self):
self.online_net.eval()

然后我们实现一个随机采样的经验池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# -*- coding: utf-8 -*-
from __future__ import division
import numpy as np
import torch

Transition_dtype = np.dtype(
[('timestep', np.int32), ('state', np.uint8, (84, 84)), ('action', np.int32), ('reward', np.float32),
('nonterminal', np.bool_)])
blank_trans = (0, np.zeros((84, 84), dtype=np.uint8), 0, 0.0, False)


# Segment tree data structure where parent node values are sum/max of children node values
class SegmentTree:
def __init__(self, size):
size = int(size)
self.index = 0
self.size = size
self.full = False # Used to track actual capacity
self.tree_start = 2 ** int(size - 1).bit_length() - 1 # Put all used node leaves on last tree level
self.sum_tree = np.zeros((self.tree_start + self.size,), dtype=np.float32)
self.data = np.array([blank_trans] * size, dtype=Transition_dtype) # Build structured array
self.max = 1 # Initial max value to return (1 = 1^ω)

# Updates nodes values from current tree
def _update_nodes(self, indices):
children_indices = indices * 2 + np.expand_dims([1, 2], axis=1)
self.sum_tree[indices] = np.sum(self.sum_tree[children_indices], axis=0)

# Propagates changes up tree given tree indices
def _propagate(self, indices):
parents = (indices - 1) // 2
unique_parents = np.unique(parents)
self._update_nodes(unique_parents)
if parents[0] != 0:
self._propagate(parents)

# Propagates single value up tree given a tree index for efficiency
def _propagate_index(self, index):
parent = (index - 1) // 2
left, right = 2 * parent + 1, 2 * parent + 2
self.sum_tree[parent] = self.sum_tree[left] + self.sum_tree[right]
if parent != 0:
self._propagate_index(parent)

# Updates values given tree indices
def update(self, indices, values):
self.sum_tree[indices] = values # Set new values
self._propagate(indices) # Propagate values
current_max_value = np.max(values)
self.max = max(current_max_value, self.max)

# Updates single value given a tree index for efficiency
def _update_index(self, index, value):
self.sum_tree[index] = value # Set new value
self._propagate_index(index) # Propagate value
self.max = max(value, self.max)

def append(self, data, value):
self.data[self.index] = data # Store data in underlying data structure
self._update_index(self.index + self.tree_start, value) # Update tree
self.index = (self.index + 1) % self.size # Update index
self.full = self.full or self.index == 0 # Save when capacity reached
self.max = max(value, self.max)

# Searches for the location of values in sum tree
def _retrieve(self, indices, values):
children_indices = (indices * 2 + np.expand_dims([1, 2], axis=1)) # Make matrix of children indices
# If indices correspond to leaf nodes, return them
if children_indices[0, 0] >= self.sum_tree.shape[0]:
return indices
# If children indices correspond to leaf nodes, bound rare outliers in case total slightly overshoots
elif children_indices[0, 0] >= self.tree_start:
children_indices = np.minimum(children_indices, self.sum_tree.shape[0] - 1)
left_children_values = self.sum_tree[children_indices[0]]
successor_choices = np.greater(values, left_children_values).astype(
np.int32) # Classify which values are in left or right branches
successor_indices = children_indices[
successor_choices, np.arange(indices.size)] # Use classification to index into the indices matrix
successor_values = values - successor_choices * left_children_values # Subtract the left branch values when searching in the right branch
return self._retrieve(successor_indices, successor_values)

# Searches for values in sum tree and returns values, data indices and tree indices
def find(self, values):
indices = self._retrieve(np.zeros(values.shape, dtype=np.int32), values)
data_index = indices - self.tree_start
return (self.sum_tree[indices], data_index, indices) # Return values, data indices, tree indices

# Returns data given a data index
def get(self, data_index):
return self.data[data_index % self.size]

def total(self):
return self.sum_tree[0]


class ReplayMemory:
def __init__(self, priority_weight=0.4, priority_exponent=0.5, multi_step=3, capacity=1e6, device=torch.device('cuda'), history_length=4, discount=0.99):
self.device = device
self.capacity = capacity
self.history = history_length
self.discount = discount
self.n = multi_step
# Initial importance sampling weight β, annealed to 1 over course of training
self.priority_weight = priority_weight
self.priority_exponent = priority_exponent
self.t = 0 # Internal episode timestep counter
self.n_step_scaling = torch.tensor([self.discount ** i for i in range(self.n)], dtype=torch.float32,
device=self.device) # Discount-scaling vector for n-step returns
self.transitions = SegmentTree(capacity)
# Store transitions in a wrap-around cyclic buffer within a sum tree for querying priorities

# Adds state and action at time t, reward and terminal at time t + 1
def append(self, state, action, reward, terminal):
state = state[-1].mul(255).to(dtype=torch.uint8,
device=torch.device('cpu')) # Only store last frame and discretise to save memory
self.transitions.append((self.t, state, action, reward, not terminal),
self.transitions.max) # Store new transition with maximum priority
self.t = 0 if terminal else self.t + 1 # Start new episodes with t = 0

# Returns the transitions with blank states where appropriate
def _get_transitions(self, idxs):
transition_idxs = np.arange(-self.history + 1, self.n + 1) + np.expand_dims(idxs, axis=1)
transitions = self.transitions.get(transition_idxs)
transitions_firsts = transitions['timestep'] == 0
blank_mask = np.zeros_like(transitions_firsts, dtype=np.bool_)
for t in range(self.history - 2, -1, -1): # e.g. 2 1 0
blank_mask[:, t] = np.logical_or(blank_mask[:, t + 1],
transitions_firsts[:, t + 1]) # True if future frame has timestep 0
for t in range(self.history, self.history + self.n): # e.g. 4 5 6
blank_mask[:, t] = np.logical_or(blank_mask[:, t - 1],
transitions_firsts[:, t]) # True if current or past frame has timestep 0
transitions[blank_mask] = blank_trans
return transitions

# Returns a valid sample from each segment
def _get_samples_from_segments(self, batch_size, p_total):
segment_length = p_total / batch_size # Batch size number of segments, based on sum over all probabilities
segment_starts = np.arange(batch_size) * segment_length
valid = False
while not valid:
samples = np.random.uniform(0.0, segment_length,
[batch_size]) + segment_starts # Uniformly sample from within all segments
probs, idxs, tree_idxs = self.transitions.find(
samples) # Retrieve samples from tree with un-normalised probability
if np.all((self.transitions.index - idxs) % self.capacity > self.n) and np.all(
(idxs - self.transitions.index) % self.capacity >= self.history) and np.all(probs != 0):
valid = True # Note that conditions are valid but extra conservative around buffer index 0
# Retrieve all required transition data (from t - h to t + n)
transitions = self._get_transitions(idxs)
# Create un-discretised states and nth next states
all_states = transitions['state']
states = torch.tensor(all_states[:, :self.history], device=self.device, dtype=torch.float32).div_(255)
next_states = torch.tensor(all_states[:, self.n:self.n + self.history], device=self.device,
dtype=torch.float32).div_(255)
# Discrete actions to be used as index
actions = torch.tensor(np.copy(transitions['action'][:, self.history - 1]), dtype=torch.int64,
device=self.device)
# Calculate truncated n-step discounted returns R^n = Σ_k=0->n-1 (γ^k)R_t+k+1 (note that invalid nth next states have reward 0)
rewards = torch.tensor(np.copy(transitions['reward'][:, self.history - 1:-1]), dtype=torch.float32,
device=self.device)
R = torch.matmul(rewards, self.n_step_scaling)
# Mask for non-terminal nth next states
nonterminals_numpy = np.expand_dims(transitions['nonterminal'][:, self.history + self.n - 1], axis=1)
nonterminals = torch.tensor(nonterminals_numpy, dtype=torch.float32, device=self.device)
return probs, idxs, tree_idxs, states, actions, R, next_states, nonterminals

def sample(self, batch_size):
# Retrieve sum of all priorities (used to create a normalised probability distribution)
p_total = self.transitions.total()
probs, idxs, tree_idxs, states, actions, returns, next_states, nonterminals = (
self._get_samples_from_segments(batch_size, p_total)) # Get batch of valid samples
probs = probs / p_total # Calculate normalised probabilities
capacity = self.capacity if self.transitions.full else self.transitions.index
weights = (capacity * probs) ** -self.priority_weight # Compute importance-sampling weights w
weights = torch.tensor(weights / weights.max(), dtype=torch.float32,
device=self.device) # Normalise by max importance-sampling weight from batch
return tree_idxs, states, actions, returns, next_states, nonterminals, weights

def update_priorities(self, idxs, priorities):
priorities = np.power(priorities, self.priority_exponent)
self.transitions.update(idxs, priorities)

# Set up internal state for iterator
def __iter__(self):
self.current_idx = 0
return self

# Return valid states for validation
def __next__(self):
if self.current_idx == self.capacity:
raise StopIteration
transitions = self.transitions.data[np.arange(self.current_idx - self.history + 1, self.current_idx + 1)]
transitions_firsts = transitions['timestep'] == 0
blank_mask = np.zeros_like(transitions_firsts, dtype=np.bool_)
for t in reversed(range(self.history - 1)):
blank_mask[t] = np.logical_or(blank_mask[t + 1],
transitions_firsts[t + 1]) # If future frame has timestep 0
transitions[blank_mask] = blank_trans
state = torch.tensor(transitions['state'], dtype=torch.float32, device=self.device).div_(
255) # Agent will turn into batch
self.current_idx += 1
return state

next = __next__ # Alias __next__ for Python 2 compatibility

到目前为止我们介绍了所有的网络模型,我们给出main方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import time
from datetime import datetime

import numpy as np
import torch
import gym
from tqdm import trange

from Rainbow.memory import ReplayMemory
from Rainbow.model import RainBow

# 指定设备为cuda
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 评估尺寸大小
evaluation_size = int(500)
T_max = int(8e4)
replay_frequency = int(4)
reward_clip = int(1)
learn_start = int(2e10)
evaluation_interval = int(100000)
target_update = int(8e3)
priority_weight = int(0.4)
priority_weight_increase = (1 - priority_weight) / (T_max - learn_start)


def log(s):
print('[' + str(datetime.now().strftime('%Y-%m-%dT%H:%M:%S')) + '] ' + s)


def exampleShow(net, env):
status, _ = env.reset()
while True:
env.render()
action = net.act(status)
status, reward, done, _, _ = env.step(action) # Step
time.sleep(0.1)


def main():
env = gym.make('CartPole-v1') # 可以替换为其他环境
action_space = env.action_space.n
status_space = env.observation_space.shape[0]

dqn = RainBow(status_space, action_space).to(device=device)
mem = ReplayMemory()
val_mem = ReplayMemory(capacity=evaluation_size)

T, done = 0, True
while T < evaluation_size:
if done:
state, _ = env.reset()

next_state, reward, done, _, _ = env.step(np.random.randint(0, action_space)) # 执行动作并获取下一个状态和奖励

state = torch.Tensor(state)
val_mem.append(state, -1, 0.0, done)
state = next_state
T += 1

dqn.train()
done = True
for T in trange(1, T_max + 1):
if done:
state, _ = env.reset()

if T % replay_frequency == 0:
dqn.reset_noise() # Draw a new set of noisy weights

action = dqn.act(state) # Choose an action greedily (with noisy weights)
next_state, reward, done, _, _ = env.step(action) # Step
if reward_clip > 0:
reward = max(min(reward, reward_clip), -reward_clip) # Clip rewards
state = torch.Tensor(state)
mem.append(state, action, reward, done) # Append transition to memory

# Train and test
if T >= learn_start:
mem.priority_weight = min(mem.priority_weight + priority_weight_increase, 1)

if T % replay_frequency == 0:
dqn.learn(mem)

# Update target network
if T % target_update == 0:
dqn.update_target_net()

state = next_state

env.close()
env2 = gym.make('CartPole-v1', render_mode="human")
exampleShow(env=env2, net=dqn)
pass


if __name__ == "__main__":
main()

因倒立摆的动作空间是离散的,并不能完全发挥Rainbow模型,而我们删除了共享卷积层,也会影响一部分性能,但是速度会大幅度提高:

5

我们运行并可视化就可以看到立的很好(

DDPG

DDPG也叫深度确定性策略网络,他也是一个策略网络,这个网络是一个较为复杂的模型,结构可以看如下的图示:

6

这里实在画不动了,手绘了一些。目标网络会按照比例进行更新AC参数,我们实现一下这个网络:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 定义 Actor 网络
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, 400)
self.fc2 = nn.Linear(400, 300)
self.fc3 = nn.Linear(300, action_dim)
self.max_action = max_action

def forward(self, state):
x = torch.relu(self.fc1(state))
x = torch.relu(self.fc2(x))
action = torch.tanh(self.fc3(x)) # Tanh to ensure actions are within [-1, 1]
return action * self.max_action # Scale to action range


# 定义 Critic 网络
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_dim, 400)
self.fc2 = nn.Linear(400 + action_dim, 300)
self.fc3 = nn.Linear(300, 1)

def forward(self, state, action):
x = torch.relu(self.fc1(state))
x = torch.cat([x, action], dim=1) # Concatenate state and action
x = torch.relu(self.fc2(x))
q_value = self.fc3(x)
return q_value


class ReplayBuffer:
def __init__(self, max_size=1000000):
self.buffer = deque(maxlen=max_size)

def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size):
return random.sample(self.buffer, batch_size)

def size(self):
return len(self.buffer)

def __len__(self):
return len(self.buffer)

目标网络需要对值网络进行软更新(也就是上图下面写的公式,同时目标网络训练的数据都需要保存在经验池内,让值网络进行探索,然后进行学习。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class DDPG:
def __init__(self, state_dim, action_dim, max_action, device=torch.device('cuda')):
# Actor and Critic networks
self.actor = Actor(state_dim, action_dim, max_action).cuda()
self.critic = Critic(state_dim, action_dim).cuda()

# Target networks
self.target_actor = Actor(state_dim, action_dim, max_action).cuda()
self.target_critic = Critic(state_dim, action_dim).cuda()

# Copy weights from actor and critic to their target networks
self.target_actor.load_state_dict(self.actor.state_dict())
self.target_critic.load_state_dict(self.critic.state_dict())

# Optimizers
self.actor_optimizer = optim.AdamW(self.actor.parameters(), lr=1e-4)
self.critic_optimizer = optim.AdamW(self.critic.parameters(), lr=1e-3)

# Replay buffer
self.replay_buffer = ReplayBuffer()

# Hyperparameters
self.batch_size = 512
self.gamma = 0.99 # Discount factor
self.tau = 0.6 # Soft target update factor

self.device = device

def update(self):
if len(self.replay_buffer) < self.batch_size:
return

# Sample a batch of experiences from the replay buffer
batch = self.replay_buffer.sample(self.batch_size)
state, action, reward, next_state, done = zip(*batch)

state = torch.tensor(np.array(state)).float().cuda()
action = torch.tensor(np.array(action)).float().cuda()
reward = torch.tensor(np.array(reward)).float().cuda()
next_state = torch.tensor(np.array(next_state)).float().cuda()
done = torch.tensor(np.array(done)).float().cuda()

# Update Critic (Q-value network)
target_action = self.target_actor(next_state)
target_q_value = self.target_critic(next_state, target_action)
target_q_value = reward + (1 - done) * self.gamma * target_q_value.detach()

current_q_value = self.critic(state, action)
critic_loss = nn.MSELoss()(current_q_value, target_q_value)

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

# Update Actor (Policy network)
actor_loss = -self.critic(state, self.actor(state)).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

# Soft update of target networks
self.soft_update(self.actor, self.target_actor)
self.soft_update(self.critic, self.target_critic)

def soft_update(self, local, target):
for target_param, local_param in zip(target.parameters(), local.parameters()):
target_param.data.copy_((1.0 - self.tau) * target_param.data + self.tau * local_param.data)

def select_action(self, state):
state = torch.tensor(state).float().unsqueeze(0).cuda()
return self.actor(state).cpu().data.numpy().flatten()

7

这里有一个绘制很好看的图像,大家可以观察一下,我们给出最后的训练方法和main函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def train(env, agent, num_episodes=100, max_timesteps=200):
for episode in range(num_episodes):
state, _ = env.reset()
total_reward = 0

for t in range(max_timesteps):
# Select action using the current policy
action = agent.select_action(state)

# Perform action and observe the next state and reward
next_state, reward, done, _, _ = env.step(action)
agent.replay_buffer.push(state, action, reward, next_state, done)

# Update the agent (DDPG)
agent.update()

state = next_state
total_reward += reward

if done:
break

print(f"Episode {episode + 1}/{num_episodes}, Reward: {total_reward}")


def test(agent):
env2 = gym.make('Pendulum-v1', render_mode="human") # 创建环境
state, _ = env2.reset() # Reset环境并获得初始状态

# 将状态转化为Tensor,以便输入到神经网络中
state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

while True:
env2.render() # 渲染环境

# 选择一个动作,通常是通过网络进行推断
action = agent.select_action(state) # 假设 agent 是已经训练好的 DDPG 代理

# 执行动作并观察下一个状态、奖励和是否结束
next_state, reward, done, _, _ = env2.step(action) # 使用 env2 而不是 env

# 将状态转换为 tensor 供下一步使用
state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)

# 等待一段时间,控制渲染速度
time.sleep(0.1)


if __name__ == "__main__":
env = gym.make('Pendulum-v1') # You can replace this with any continuous action space environment
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])

device = torch.device('cuda')

agent = DDPG(state_dim, action_dim, max_action)

train(env, agent)
test(agent)

8

这里我们采用了一个连续的动作空间系统,可以看到这个倒立摆的直立效果非常好。

总结

本期博客介绍了DDPG和Rainbow,这些内容的调试花费了我一段时间,所以之后的内容会间断更新,我还要更新机器人学的知识呢


强化学习网络与机器人控制——探索学习
https://blog.minloha.cn/posts/161838440ccd0a2024111856.html
作者
Minloha
发布于
2024年11月18日
更新于
2024年11月25日
许可协议