前言
这是本系列第二篇文章。在上一篇文章中,我们介绍了如何基于gym环境编写自定义的仿真环境,并调用开源算法进行智能体训练。尽管主流算法基本上都有现成的开源实现,我们还是推荐刚接触强化学习的同学亲自实践一个有代表性的算法。因此,这一篇文章中,我们将要涉及具体算法细节,介绍如何基于Tensorflow实现连续状态的Q-learning算法。在代码组织结构上,采取面向对象式编程。
预备
- 按照本系列第一篇文章完成一个自定义的仿真环境
- python面向对象式编程基本知识:类与实例、继承、初始化、属性等
- Tensorflow基本知识:张量tensor与计算图graph、人工神经网络ANN
- 强化学习基本知识:值函数,贝尔曼方程等
表格型Q-learning算法伪码
初始化Q(S,A),给定参数α,γ
Repeat(每一个episode):
初始化状态S
Repeat(对本次episode结束前的每一步):
根据ε-贪婪策略,在状态S(t)选择动作A(t)
实施一步仿真,得到回报r(t)和下一个状态S(t+1)
Q(S(t),A(t)) = Q(S(t),A(t)) + α[r(t) + γmax_a(Q(S(t+1),a)) - Q(S(t),A(t))]
Until S(t+1) 为终止状态或者本次episode达到最大仿真步数
Until Q(S,A)收敛或者达到总仿真步数限制
连续状态的Q-learning算法
上述伪码通常适用于表格型的状态-动作空间,即使用一张二维的(状态×动作)表格存储值函数Q(S,A)。对于状态是连续值的情况,表格式的离散状态空间将会变得十分巨大。好在我们有神经网络可以对常规函数进行较好的参数化拟合。对于动作也是连续值的情况,伪码中的max_a(Q(S(t+1),a))难以求解,而需要使用基于策略梯度的强化学习方法,故本文不涉及该部分而仅考虑离散动作空间。
明确目标:连续状态空间+离散动作空间。由于动作空间是有限的,因而值函数Q(S,A)可以设计成 [Q(S,A1), Q(S,A2), Q(S,A3), …] 这样的形式。也就是输入状态向量,得到一组状态值。这个状态值向量与动作空间一一对应,标记了各种动作情况下的状态值。求max_a(Q(S(t+1),a))将变得十分简单,直接取max即可。
面向对象式编程
代码结构如下:
import tensorflow as tf
import numpy as np
class Model():
def __init__(self):
def add_layer():
pass
self.l1 = add_layer() #隐藏层
self.l2 = add_layer() #输出层
self.loss
self.train_step
self.init
self.sess
def init_train(self):
self.sess.run(self.init)
def learn(self, currentstate, qupdate):
pass
def get_Q(self, currentstate):
pass
class RL_Agent():
def __init__(self):
self.model = Model()
def act(self, currentstate):
return action
def act_off_policy(self, currentstate, action_space):
return action
class RL_QLearning(RL_Agent):
def __init__(self):
super().__init__()
def learn(self):
pass
这里有三个类Model,RL_Agent和RL_QLearning。其中Model用于生成拥有一个隐含层的神经网络结构并拟合值函数,我们也将在Model中主要编写tensorflow相关的代码。RL_Agent表示智能体,它有两种on policy和off policy策略。RL_QLearning是RL_Agent的子类,提供Q-learing算法对智能体进行训练。
分别来看代码,首先是Model:
class Model():
def __init__(self, in_size, net_size, out_size, activation):
def add_layer(inputs,in_size,out_size,n_layer,activation_function=None): #activation_function=None线性函数
layer_name="layer%s" % n_layer
with tf.name_scope(layer_name):
with tf.name_scope('weights'):
self.Weights = tf.Variable(tf.random_normal([out_size,in_size])) #Weight中都是随机变量
with tf.name_scope('biases'):
self.biases = tf.Variable(tf.zeros([out_size,1])+0.1) #biases推荐初始值不为0
with tf.name_scope('Wx_plus_b'):
self.Wx_plus_b = tf.matmul(self.Weights,inputs)+self.biases #inputs*Weight+biases
if activation_function is None:
self.outputs = self.Wx_plus_b
else:
self.outputs = activation_function(self.Wx_plus_b)
return self.outputs
tf.reset_default_graph()
with tf.name_scope('inputs'): #结构化
self.xs = tf.placeholder(tf.float32,[None,1],name='x_input')
self.ys = tf.placeholder(tf.float32,[None,1],name='y_input')
self.l1 = add_layer(self.xs,in_size,net_size,n_layer=1,activation_function=activation) #隐藏层
self.l2 = add_layer(self.l1,net_size,out_size,n_layer=2,activation_function=None) #输出层
with tf.name_scope('loss'):
self.loss = tf.reduce_mean(tf.reduce_sum(tf.square(self.ys-self.l2),reduction_indices=[1])) #square()平方,sum()求和,mean()平均值
with tf.name_scope('train'):
self.train_step = tf.train.GradientDescentOptimizer(0.1).minimize(self.loss) #0.1学习效率,minimize(loss)减小loss误差
self.init = tf.initialize_all_variables()
self.sess = tf.Session()
def init_train(self):
self.sess.run(self.init)
def learn(self, currentstate, qupdate):
self.sess.run(self.train_step,feed_dict={self.xs:currentstate,self.ys:qupdate})
def get_loss(self, currentstate, qupdate):
mse = self.sess.run(self.loss,feed_dict={self.xs:currentstate,self.ys:qupdate})
return mse
def get_Q(self, currentstate):
qcurrentstate = self.sess.run(self.l2,feed_dict={self.xs:currentstate})
return qcurrentstate
inputs域包含两个张量self.xs和self.ys用于接收状态输入和进行值函数输出拟合。通过调用add_layer函数建立隐含层和输出层。注意这里我们将神经网络的所有节点全部暴露为类的属性,这样可以方便调试,例如通过self.sess.run(self.l1,feed_dict)观察隐含层对任意状态的响应。loss使用均方误差的形式。训练使用学习率为0.1的梯度下降法。通过self.sess持有会话。注意在调用learn函数训练之前需要先调用init_train函数进行全局参数初始化。
然后是RL_Agent:
class RL_Agent():
def __init__(self, env, net_size, max_timestep, learn_rate=0.01, gamma=1.0):
self.env = env
self.max_timestep = max_timestep
self.learn_rate = learn_rate
self.gamma = gamma
self.model = Model(env.observation_space.shape[0], net_size, env.action_space.n, tf.nn.relu)
def act(self, currentstate):
action = np.argmax(self.model.get_Q(currentstate))
return action
def act_epsilon(self, currentstate):
if np.random.rand(1) < 0.01:
action = self.env.action_space.sample()
else:
action = self.act(currentstate)
return action
RL_Agent的初始化注意要引入仿真环境env。值函数实例为self.model。RL_Agent提供了两个策略,分别是贪婪策略act和ε-贪婪策略act_epsilon,均接受当前状态输入并输出对应的动作。
最后是RL_QLearning:
class RL_QLearning(RL_Agent):
def __init__(self, env, net_size, max_timestep, learn_rate=0.01, gamma=1.0):
super().__init__(env, net_size, max_timestep, learn_rate, gamma)
def learn(self):
self.model.init_train()
i = 0
while True:
obs, done = self.env.reset(), False
episode_reward = 0
while not done:
self.env.render()
currentstate = self.env.state
q_currentstate = self.model.get_Q(currentstate)
action = self.act_epsilon(currentstate)
obs, reward, done, _ = self.env.step(action)
q_newstate = self.model.get_Q(obs)
q_currentstate_action = q_currentstate[action] + self.learn_rate*(reward+self.gamma*np.max(q_newstate)-q_currentstate[action])
q_update = q_currentstate
q_update[action] = q_currentstate_action
self.model.learn(currentstate,q_update)
i = i + 1
episode_reward += reward
print(['Train', episode_reward, env.counts])
if i >= self.max_timestep:
break
RL_QLearning继承于RL_Agent。通过self.env.reset()初始化仿真环境。通过self.env.state拿到智能体的当前状态。通过self.model.get_Q(currentstate)拿到当前状态在动作空间的一组状态值。注意q_update事实上仅在当前动作action的位置修改状态值为q_currentstate_action。最后统计下episode_reward用于调试算法。
关于Tensorflow需要多说一点。在我们建立输入输出结构时,占位符的维数设置为(None, 1),这个多出来的1是为了和权值矩阵进行矩阵乘法。这就可能导致输入训练数据时产生维数不匹配错误。为了解决该问题,上一篇文章中的Car2D.py作了一处修改,将env.reset()中的状态state通过np.expand_dims增加了第二个维度。修改后的Car2D.py文件在这里下载。
完整代码如下,源码可以下载。
import tensorflow as tf
import numpy as np
class Model():
def __init__(self, in_size, net_size, out_size, activation):
def add_layer(inputs,in_size,out_size,n_layer,activation_function=None): #activation_function=None线性函数
layer_name="layer%s" % n_layer
with tf.name_scope(layer_name):
with tf.name_scope('weights'):
self.Weights = tf.Variable(tf.random_normal([out_size,in_size])) #Weight中都是随机变量
with tf.name_scope('biases'):
self.biases = tf.Variable(tf.zeros([out_size,1])+0.1) #biases推荐初始值不为0
with tf.name_scope('Wx_plus_b'):
self.Wx_plus_b = tf.matmul(self.Weights,inputs)+self.biases #inputs*Weight+biases
if activation_function is None:
self.outputs = self.Wx_plus_b
else:
self.outputs = activation_function(self.Wx_plus_b)
return self.outputs
tf.reset_default_graph()
with tf.name_scope('inputs'): #结构化
self.xs = tf.placeholder(tf.float32,[None,1],name='x_input')
self.ys = tf.placeholder(tf.float32,[None,1],name='y_input')
self.l1 = add_layer(self.xs,in_size,net_size,n_layer=1,activation_function=activation) #隐藏层
self.l2 = add_layer(self.l1,net_size,out_size,n_layer=2,activation_function=None) #输出层
with tf.name_scope('loss'):
self.loss = tf.reduce_mean(tf.reduce_sum(tf.square(self.ys-self.l2),reduction_indices=[1])) #square()平方,sum()求和,mean()平均值
with tf.name_scope('train'):
self.train_step = tf.train.GradientDescentOptimizer(0.1).minimize(self.loss) #0.1学习效率,minimize(loss)减小loss误差
self.init = tf.initialize_all_variables()
self.sess = tf.Session()
def init_train(self):
self.sess.run(self.init)
def learn(self, currentstate, qupdate):
self.sess.run(self.train_step,feed_dict={self.xs:currentstate,self.ys:qupdate})
def get_loss(self, currentstate, qupdate):
mse = self.sess.run(self.loss,feed_dict={self.xs:currentstate,self.ys:qupdate})
return mse
def get_Q(self, currentstate):
qcurrentstate = self.sess.run(self.l2,feed_dict={self.xs:currentstate})
return qcurrentstate
class RL_Agent():
def __init__(self, env, net_size, max_timestep, learn_rate=0.01, gamma=1.0):
self.env = env
self.max_timestep = max_timestep
self.learn_rate = learn_rate
self.gamma = gamma
self.model = Model(env.observation_space.shape[0], net_size, env.action_space.n, tf.nn.relu)
def act(self, currentstate):
action = np.argmax(self.model.get_Q(currentstate))
return action
def act_epsilon(self, currentstate):
if np.random.rand(1) < 0.01:
action = self.env.action_space.sample()
else:
action = self.act(currentstate)
return action
class RL_QLearning(RL_Agent):
def __init__(self, env, net_size, max_timestep, learn_rate=0.01, gamma=1.0):
super().__init__(env, net_size, max_timestep, learn_rate, gamma)
def learn(self):
self.model.init_train()
i = 0
while True:
obs, done = self.env.reset(), False
episode_reward = 0
while not done:
self.env.render()
currentstate = self.env.state
q_currentstate = self.model.get_Q(currentstate)
action = self.act_epsilon(currentstate)
obs, reward, done, _ = self.env.step(action)
q_newstate = self.model.get_Q(obs)
q_currentstate_action = q_currentstate[action] + self.learn_rate*(reward+self.gamma*np.max(q_newstate)-q_currentstate[action])
q_update = q_currentstate
q_update[action] = q_currentstate_action
self.model.learn(currentstate,q_update)
i = i + 1
episode_reward += reward
print(['Train', episode_reward, env.counts])
if i >= self.max_timestep:
break
if __name__ == '__main__':
from Car2D import Car2DEnv
env = Car2DEnv()
RL = RL_QLearning(env,10,10000)
RL.learn()
print('======================Done!=====================')
while True:
obs, done = env.reset(), False
episode_reward = 0
steps = 0
while not done:
env.render()
obs, reward, done, _ = env.step(RL.act(obs))
episode_reward += reward
steps = steps + 1
if steps > 100:
done = 1
print(['Test', episode_reward, env.counts])