TensorFlow学习笔记(9):分布式TensorFlow

简介

TensorFlow支持使用多台机器的设备进行计算。本文基于官方教程,实践了分布式TensorFlow搭建的过程。

TensorFlow入门教程

基本概念

TensorFlow集群

A TensorFlow “cluster” is a set of “tasks” that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow “server”, which contains a “master” that can be used to create sessions, and a “worker” that executes operations in the graph.

从上面的定义可以看出,所谓的TensorFlow集群就是一组任务,每个任务就是一个服务。服务由两个部分组成,第一部分是master,用于创建session,第二部分是worker,用于执行具体的计算。

TensorFlow一般将任务分为两类job:一类叫参数服务器,parameter server,简称为ps,用于存储tf.Variable;一类就是普通任务,称为worker,用于执行具体的计算。

首先来理解一下参数服务器的概念。一般而言,机器学习的参数训练过程可以划分为两个类别:第一个是根据参数算算梯度,第二个是根据梯度更新参数。对于小规模训练,数据量不大,参数数量不多,一个CPU就足够了,两类任务都交给一个CPU来做。对于普通的中等规模的训练,数据量比较大,参数数量不多,计算梯度的任务负荷较重,参数更新的任务负荷较轻,所以将第一类任务交给若干个CPU或GPU去做,第二类任务交给一个CPU即可。对于超大规模的训练,数据量大、参数多,不仅计算梯度的任务要部署到多个CPU或GPU上,而且更新参数的任务也要部署到多个CPU。如果计算量足够大,一台机器能搭载的CPU和GPU数量有限,就需要多台机器来进行计算能力的扩展了。参数服务器是一套分布式存储,用于保存参数,并提供参数更新的操作。

我们来看一下怎么创建一个TensorFlow集群。每个任务用一个ip:port表示。TensorFlow用tf.train.ClusterSpec表示一个集群信息,举例如下:

import tensorflow as tf

# Configuration of cluster 
ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

上面的语句提供了一个TensorFlow集群信息,集群有两类任务,称为job,一个job是ps,一个job是worker;ps由2个任务组成,worker由3个任务组成。

定义完集群信息后,使用tf.train.Server创建每个任务:

tf.app.flags.DEFINE_string("job_name", "worker", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS

def main(_):
    server = tf.train.Server(cluster,
                             job_name=FLAGS.job_name,
                             task_index=FLAGS.task_index)
    server.join()

if __name__ == "__main__":
    tf.app.run()

对于本例而言,我们需要在ip:port对应的机器上运行每个任务,共需执行五次代码,生成五个任务。

python worker.py --job_name=ps --task_index=0
python worker.py --job_name=ps --task_index=1
python worker.py --job_name=worker --task_index=0
python worker.py --job_name=worker --task_index=1
python worker.py --job_name=worker --task_index=2

我们找到集群的某一台机器,执行下面的代码:

# -*- coding=utf-8 -*-

import tensorflow as tf
import numpy as np

train_X = np.random.rand(100).astype(np.float32)
train_Y = train_X * 0.1 + 0.3

# 选择变量存储位置和op执行位置,这里全部放在worker的第一个task上
with tf.device("/job:worker/task:0"):
    X = tf.placeholder(tf.float32)
    Y = tf.placeholder(tf.float32)
    w = tf.Variable(0.0, name="weight")
    b = tf.Variable(0.0, name="reminder")
    y = w * X + b
    loss = tf.reduce_mean(tf.square(y - Y))

    init_op = tf.global_variables_initializer()
    train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)

# 选择创建session使用的master
with tf.Session("grpc://xx.xxx.xx.xxxx:oooo") as sess:
    sess.run(init_op)
    for i in range(500):
        sess.run(train_op, feed_dict={X: train_Y, Y: train_Y})
        if i % 50 == 0:
            print i, sess.run(w), sess.run(b)

    print sess.run(w)
    print sess.run(b)

执行结果如下:

0 0.00245265 0.00697793
50 0.0752466 0.213145
100 0.0991397 0.279267
150 0.107308 0.30036
200 0.110421 0.306972
250 0.111907 0.308929
300 0.112869 0.309389
350 0.113663 0.309368
400 0.114402 0.309192
450 0.115123 0.308967
0.115824
0.30873

其实ps和worker本质上是一个东西,就是名字不同,我们将上例中的with tf.device("/job:worker/task:0"):改为with tf.device("/job:psr/task:0"):,一样能够执行。之所以在创建集群时要分为两个类别的任务,是因为TensorFlow提供了一些工具函数,会根据名字不同赋予task不同的任务,ps的用于存储变量,worker的用于计算。

同步与异步更新

  • 同步更新:将数据拆分成多份,每份基于参数计算出各自部分的梯度;当每一份的部分梯度计算完成后,收集到一起算出总梯度,再用总梯度去更新参数。

  • 异步更新:同步更新模式下,每次都要等各个部分的梯度计算完后才能进行参数更新操作,处理速度取决于计算梯度最慢的那个部分,其他部分存在大量的等待时间浪费;异步更新模式下,所有的部分只需要算自己的梯度,根据自己的梯度更新参数,不同部分之间不存在通信和等待。

分布式训练案例

import tensorflow as tf
import numpy as np

# Configuration of cluster 
ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS

def main(_):
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
        
        x_data = tf.placeholder(tf.float32, [100])
        y_data = tf.placeholder(tf.float32, [100])

        W = tf.Variable(tf.random_uniform([1], -1.0, 1.0))
        b = tf.Variable(tf.zeros([1]))
        y = W * x_data + b
        loss = tf.reduce_mean(tf.square(y - y_data))
        
        global_step = tf.Variable(0, name="global_step", trainable=False)
        optimizer = tf.train.GradientDescentOptimizer(0.1)
        train_op = optimizer.minimize(loss, global_step=global_step)
        
        tf.summary.scalar('cost', loss)
        summary_op = tf.summary.merge_all()
        init_op = tf.global_variables_initializer()
    # The StopAtStepHook handles stopping after running given steps.
    hooks = [ tf.train.StopAtStepHook(last_step=1000000)]
    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master="grpc://" + worker_hosts[FLAGS.task_index],
                                           is_chief=(FLAGS.task_index==0), # 我们制定task_index为0的任务为主任务,用于负责变量初始化、做checkpoint、保存summary和复原
                                           checkpoint_dir="/tmp/tf_train_logs",
                                           save_checkpoint_secs=None,
                                           hooks=hooks) as mon_sess:
        while not mon_sess.should_stop():
            # Run a training step asynchronously.
            # See `tf.train.SyncReplicasOptimizer` for additional details on how to
            # perform *synchronous* training.
            # mon_sess.run handles AbortedError in case of preempted PS.
            train_x = np.random.rand(100).astype(np.float32)
            train_y = train_x * 0.1 + 0.3
            _, step, loss_v, weight, biase = mon_sess.run([train_op, global_step, loss, W, b], feed_dict={x_data: train_x, y_data: train_y})
            if step % 100 == 0:
                print "step: %d, weight: %f, biase: %f, loss: %f" %(step, weight, biase, loss_v)
        print "Optimization finished."

if __name__ == "__main__":
    tf.app.run()

代码中,tf.train.replica_device_setter()会根据job名,将with内的Variable op放到ps tasks,将其他计算op放到worker tasks。默认分配策略是轮询。

在属于集群的一台机器中执行上面的代码,屏幕会开始输出每轮迭代的训练参数和损失

python train.py --task_index=0

在另一台机器上执行下面你的代码,再启动一个任务,会看到屏幕开始输出每轮迭代的训练参数和损失,注意,step不再是从0开始,而是在启动时刻上一个启动任务的step后继续。此时观察两个任务,会发现他们同时在对同一参数进行更新。

python train.py --task_index=2

思考

分布式TensorFlow与Spark对比:

  • 分布式的级别不同:TensorFlow的Tensor、Variable和Op不是分布式的,分布式执行的是subgraph. Spark的op和变量都是构建在RDD上,RDD本身是分布式的。

  • 异步训练:TensorFlow支持同步和异步的分布式训练;Spark原生的API只支持同步训练

  • 分布式存储:Spark在底层封装好了worker和分布式数据之间的关系;TensorFlow需要自行维护。

  • Parameter Server:TensorFlow支持,Spark暂不支持。

  • TF分布式部署起来还是比较繁琐的,需要定义好每个任务的ip:port,手工启动每个task,不提供一个界面可以对集群进行维护。

参考资料

    原文作者:丹追兵
    原文地址: https://segmentfault.com/a/1190000008376957
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞