强化学习快速上手:面向对象编程+基于Tensorflow实现连续状态的Q-learning算法

前言

这是本系列第二篇文章。在上一篇文章中,我们介绍了如何基于gym环境编写自定义的仿真环境,并调用开源算法进行智能体训练。尽管主流算法基本上都有现成的开源实现,我们还是推荐刚接触强化学习的同学亲自实践一个有代表性的算法。因此,这一篇文章中,我们将要涉及具体算法细节,介绍如何基于Tensorflow实现连续状态的Q-learning算法。在代码组织结构上,采取面向对象式编程。

预备

  1. 按照本系列第一篇文章完成一个自定义的仿真环境
  2. python面向对象式编程基本知识:类与实例、继承、初始化、属性等
  3. Tensorflow基本知识:张量tensor与计算图graph、人工神经网络ANN
  4. 强化学习基本知识:值函数,贝尔曼方程等

表格型Q-learning算法伪码

初始化Q(S,A),给定参数α,γ
Repeat(每一个episode):
    初始化状态S
    Repeat(对本次episode结束前的每一步):
        根据ε-贪婪策略,在状态S(t)选择动作A(t)
        实施一步仿真,得到回报r(t)和下一个状态S(t+1)
        Q(S(t),A(t)) = Q(S(t),A(t)) + α[r(t) + γmax_a(Q(S(t+1),a)) - Q(S(t),A(t))]
    Until S(t+1) 为终止状态或者本次episode达到最大仿真步数
Until Q(S,A)收敛或者达到总仿真步数限制

连续状态的Q-learning算法

上述伪码通常适用于表格型的状态-动作空间,即使用一张二维的(状态×动作)表格存储值函数Q(S,A)。对于状态是连续值的情况,表格式的离散状态空间将会变得十分巨大。好在我们有神经网络可以对常规函数进行较好的参数化拟合。对于动作也是连续值的情况,伪码中的max_a(Q(S(t+1),a))难以求解,而需要使用基于策略梯度的强化学习方法,故本文不涉及该部分而仅考虑离散动作空间。

明确目标:连续状态空间+离散动作空间。由于动作空间是有限的,因而值函数Q(S,A)可以设计成 [Q(S,A1), Q(S,A2), Q(S,A3), …] 这样的形式。也就是输入状态向量,得到一组状态值。这个状态值向量与动作空间一一对应,标记了各种动作情况下的状态值。求max_a(Q(S(t+1),a))将变得十分简单,直接取max即可。

面向对象式编程

代码结构如下:

import tensorflow as tf
import numpy as np

class Model():
    def __init__(self):
        def add_layer():
            pass

        self.l1 = add_layer() #隐藏层 
        self.l2 = add_layer() #输出层
        self.loss
        self.train_step
        self.init
        self.sess

    
    def init_train(self):
        self.sess.run(self.init)
    
    def learn(self, currentstate, qupdate):
        pass
    
    def get_Q(self, currentstate):
        pass

class RL_Agent():
    def __init__(self):
        self.model = Model()

    def act(self, currentstate):
        return action
    
    def act_off_policy(self, currentstate, action_space):
        return action

class RL_QLearning(RL_Agent):
    def __init__(self):
        super().__init__()
        
    def learn(self):
        pass

这里有三个类Model,RL_Agent和RL_QLearning。其中Model用于生成拥有一个隐含层的神经网络结构并拟合值函数,我们也将在Model中主要编写tensorflow相关的代码。RL_Agent表示智能体,它有两种on policy和off policy策略。RL_QLearning是RL_Agent的子类,提供Q-learing算法对智能体进行训练。

分别来看代码,首先是Model:

class Model():
    def __init__(self, in_size, net_size, out_size, activation):
        def add_layer(inputs,in_size,out_size,n_layer,activation_function=None): #activation_function=None线性函数 
            layer_name="layer%s" % n_layer
            with tf.name_scope(layer_name):
                with tf.name_scope('weights'):
                    self.Weights = tf.Variable(tf.random_normal([out_size,in_size])) #Weight中都是随机变量 
                with tf.name_scope('biases'):
                    self.biases = tf.Variable(tf.zeros([out_size,1])+0.1) #biases推荐初始值不为0 
                with tf.name_scope('Wx_plus_b'):
                    self.Wx_plus_b = tf.matmul(self.Weights,inputs)+self.biases #inputs*Weight+biases 
                if activation_function is None:
                    self.outputs = self.Wx_plus_b
                else:
                    self.outputs = activation_function(self.Wx_plus_b)
                return self.outputs
        
        tf.reset_default_graph()

        with tf.name_scope('inputs'): #结构化 
            self.xs = tf.placeholder(tf.float32,[None,1],name='x_input')
            self.ys = tf.placeholder(tf.float32,[None,1],name='y_input')
        
        self.l1 = add_layer(self.xs,in_size,net_size,n_layer=1,activation_function=activation) #隐藏层 
        self.l2 = add_layer(self.l1,net_size,out_size,n_layer=2,activation_function=None) #输出层

        with tf.name_scope('loss'):
            self.loss = tf.reduce_mean(tf.reduce_sum(tf.square(self.ys-self.l2),reduction_indices=[1])) #square()平方,sum()求和,mean()平均值 
        with tf.name_scope('train'):
            self.train_step = tf.train.GradientDescentOptimizer(0.1).minimize(self.loss) #0.1学习效率,minimize(loss)减小loss误差 

        self.init = tf.initialize_all_variables()
        self.sess = tf.Session()
    
    def init_train(self):
        self.sess.run(self.init)
    
    def learn(self, currentstate, qupdate):
        self.sess.run(self.train_step,feed_dict={self.xs:currentstate,self.ys:qupdate})
    
    def get_loss(self, currentstate, qupdate):
        mse = self.sess.run(self.loss,feed_dict={self.xs:currentstate,self.ys:qupdate})
        return mse
    
    def get_Q(self, currentstate):
        qcurrentstate = self.sess.run(self.l2,feed_dict={self.xs:currentstate})
        return qcurrentstate

inputs域包含两个张量self.xs和self.ys用于接收状态输入和进行值函数输出拟合。通过调用add_layer函数建立隐含层和输出层。注意这里我们将神经网络的所有节点全部暴露为类的属性,这样可以方便调试,例如通过self.sess.run(self.l1,feed_dict)观察隐含层对任意状态的响应。loss使用均方误差的形式。训练使用学习率为0.1的梯度下降法。通过self.sess持有会话。注意在调用learn函数训练之前需要先调用init_train函数进行全局参数初始化。

然后是RL_Agent:

class RL_Agent():
    def __init__(self, env, net_size, max_timestep, learn_rate=0.01, gamma=1.0):
        self.env = env
        self.max_timestep = max_timestep
        self.learn_rate = learn_rate
        self.gamma = gamma
        self.model = Model(env.observation_space.shape[0], net_size, env.action_space.n, tf.nn.relu)

    def act(self, currentstate):
        action = np.argmax(self.model.get_Q(currentstate))
        return action
    
    def act_epsilon(self, currentstate):
        if np.random.rand(1) < 0.01:
            action = self.env.action_space.sample()
        else:
            action = self.act(currentstate)
        return action

RL_Agent的初始化注意要引入仿真环境env。值函数实例为self.model。RL_Agent提供了两个策略,分别是贪婪策略act和ε-贪婪策略act_epsilon,均接受当前状态输入并输出对应的动作。

最后是RL_QLearning:

class RL_QLearning(RL_Agent):
    def __init__(self, env, net_size, max_timestep, learn_rate=0.01, gamma=1.0):
        super().__init__(env, net_size, max_timestep, learn_rate, gamma)
        
    def learn(self):
        self.model.init_train()
        i = 0
        while True:
            obs, done = self.env.reset(), False
            episode_reward = 0
            while not done:
                self.env.render()
                currentstate = self.env.state
                q_currentstate = self.model.get_Q(currentstate)
                
                action = self.act_epsilon(currentstate)
                
                obs, reward, done, _ = self.env.step(action)
                
                q_newstate = self.model.get_Q(obs)
                
                q_currentstate_action = q_currentstate[action] + self.learn_rate*(reward+self.gamma*np.max(q_newstate)-q_currentstate[action])
                q_update = q_currentstate
                q_update[action] = q_currentstate_action                
                self.model.learn(currentstate,q_update)
                
                i = i + 1
                episode_reward += reward
            print(['Train', episode_reward, env.counts])
            if i >= self.max_timestep:
                break

RL_QLearning继承于RL_Agent。通过self.env.reset()初始化仿真环境。通过self.env.state拿到智能体的当前状态。通过self.model.get_Q(currentstate)拿到当前状态在动作空间的一组状态值。注意q_update事实上仅在当前动作action的位置修改状态值为q_currentstate_action。最后统计下episode_reward用于调试算法。

关于Tensorflow需要多说一点。在我们建立输入输出结构时,占位符的维数设置为(None, 1),这个多出来的1是为了和权值矩阵进行矩阵乘法。这就可能导致输入训练数据时产生维数不匹配错误。为了解决该问题,上一篇文章中的Car2D.py作了一处修改,将env.reset()中的状态state通过np.expand_dims增加了第二个维度。修改后的Car2D.py文件在这里下载

完整代码如下,源码可以下载

import tensorflow as tf
import numpy as np

class Model():
    def __init__(self, in_size, net_size, out_size, activation):
        def add_layer(inputs,in_size,out_size,n_layer,activation_function=None): #activation_function=None线性函数 
            layer_name="layer%s" % n_layer
            with tf.name_scope(layer_name):
                with tf.name_scope('weights'):
                    self.Weights = tf.Variable(tf.random_normal([out_size,in_size])) #Weight中都是随机变量 
                with tf.name_scope('biases'):
                    self.biases = tf.Variable(tf.zeros([out_size,1])+0.1) #biases推荐初始值不为0 
                with tf.name_scope('Wx_plus_b'):
                    self.Wx_plus_b = tf.matmul(self.Weights,inputs)+self.biases #inputs*Weight+biases 
                if activation_function is None:
                    self.outputs = self.Wx_plus_b
                else:
                    self.outputs = activation_function(self.Wx_plus_b)
                return self.outputs
        
        tf.reset_default_graph()

        with tf.name_scope('inputs'): #结构化 
            self.xs = tf.placeholder(tf.float32,[None,1],name='x_input')
            self.ys = tf.placeholder(tf.float32,[None,1],name='y_input')
        
        self.l1 = add_layer(self.xs,in_size,net_size,n_layer=1,activation_function=activation) #隐藏层 
        self.l2 = add_layer(self.l1,net_size,out_size,n_layer=2,activation_function=None) #输出层

        with tf.name_scope('loss'):
            self.loss = tf.reduce_mean(tf.reduce_sum(tf.square(self.ys-self.l2),reduction_indices=[1])) #square()平方,sum()求和,mean()平均值 
        with tf.name_scope('train'):
            self.train_step = tf.train.GradientDescentOptimizer(0.1).minimize(self.loss) #0.1学习效率,minimize(loss)减小loss误差 

        self.init = tf.initialize_all_variables()
        self.sess = tf.Session()
    
    def init_train(self):
        self.sess.run(self.init)
    
    def learn(self, currentstate, qupdate):
        self.sess.run(self.train_step,feed_dict={self.xs:currentstate,self.ys:qupdate})
    
    def get_loss(self, currentstate, qupdate):
        mse = self.sess.run(self.loss,feed_dict={self.xs:currentstate,self.ys:qupdate})
        return mse
    
    def get_Q(self, currentstate):
        qcurrentstate = self.sess.run(self.l2,feed_dict={self.xs:currentstate})
        return qcurrentstate

class RL_Agent():
    def __init__(self, env, net_size, max_timestep, learn_rate=0.01, gamma=1.0):
        self.env = env
        self.max_timestep = max_timestep
        self.learn_rate = learn_rate
        self.gamma = gamma
        self.model = Model(env.observation_space.shape[0], net_size, env.action_space.n, tf.nn.relu)

    def act(self, currentstate):
        action = np.argmax(self.model.get_Q(currentstate))
        return action
    
    def act_epsilon(self, currentstate):
        if np.random.rand(1) < 0.01:
            action = self.env.action_space.sample()
        else:
            action = self.act(currentstate)
        return action

class RL_QLearning(RL_Agent):
    def __init__(self, env, net_size, max_timestep, learn_rate=0.01, gamma=1.0):
        super().__init__(env, net_size, max_timestep, learn_rate, gamma)
        
    def learn(self):
        self.model.init_train()
        i = 0
        while True:
            obs, done = self.env.reset(), False
            episode_reward = 0
            while not done:
                self.env.render()
                currentstate = self.env.state
                q_currentstate = self.model.get_Q(currentstate)
                
                action = self.act_epsilon(currentstate)
                
                obs, reward, done, _ = self.env.step(action)
                
                q_newstate = self.model.get_Q(obs)
                
                q_currentstate_action = q_currentstate[action] + self.learn_rate*(reward+self.gamma*np.max(q_newstate)-q_currentstate[action])
                q_update = q_currentstate
                q_update[action] = q_currentstate_action                
                self.model.learn(currentstate,q_update)
                
                i = i + 1
                episode_reward += reward
            print(['Train', episode_reward, env.counts])
            if i >= self.max_timestep:
                break

if __name__ == '__main__':
    from Car2D import Car2DEnv
    env = Car2DEnv()
    
    RL = RL_QLearning(env,10,10000)
    RL.learn()
    
    print('======================Done!=====================')
    
    while True:
        obs, done = env.reset(), False
        episode_reward = 0
        steps = 0
        while not done:
            env.render()
            obs, reward, done, _ = env.step(RL.act(obs))
            episode_reward += reward
            steps = steps + 1
            if steps > 100:
                done = 1
        print(['Test', episode_reward, env.counts])
    原文作者:Orion Nebula
    原文地址: https://zhuanlan.zhihu.com/p/34162759
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞