TensorFlowOnSpark
项目是由Yahoo
开源的一个软件包,实现TensorFlow
集群服务部署在Spark
平台之上。
大家好,这次我将分享TensorFlow On Spark
的解决方案,将TensorFlow
集群部署在Spark
平台之上,实现了TensorFlow
与Spark
的无缝连接,更好地解决了两者数据传递的问题。
tfos.part2.2_2.pdf.jpg
这次分享的主要内容包括TensorFlowOnSpark
架构设计,探讨其工作原理,通过理解其设计,更好地理解TensorFlow
集群在Spark
平台上的运行机制。
tfos.part3.3_3.pdf.jpg
首先,探讨TensorFlowOnSpark
的架构与设计。主要包括如下两个基本内容:
- 架构分析
- 生命周期
tfos.part4.4_4.pdf.jpg
在开始之前,先探讨一下TensorFlowOnSpark
的背景,及其它需要解决的问题。为了实现Spark
利用TensorFlow
深度学习,及其GPU
加速的能力,最常见的解决方案如上图所示。
搭建TensorFlow
集群,并通过利用既有的Spark
集群的数据完成模型的训练,最种再将训练好的模型部署在Spark
集群上,实现数据的预测。
该方案虽然实现了Spark
集群的深度学习,及其GPU
加速的能力,但需要Spark
集群与TensorFlow
集群之间的数据传递,造成冗余的系统复杂度。
tfos.part5.5_5.pdf.jpg
很容易想到,可以将TensorFlow
集群部署在Spark
之上,用于解决集群间数据传递的问题。
依次类同,该方案可实现Caffe
部署在Spark
集群之上,实现Spark
集群对多种深度学习框架的支持能力,并兼容既有Spark
组件的完整性,包括Spark MLLib, Spark Streaming, Spark SQL
等。
tfos.part6.6_6.pdf.jpg
TensorFlowOnSpark
的架构较为简单,Spark Driver
程序并不会参与TensorFlow
内部相关的计算和处理。其设计思路像是将一个TensorFlow
集群运行在了Spark
上,其在每个Spark Executor
中启动TensorFlow
应用程序,然后通过gRPC
或RDMA
方式进行数据传递与交互。
tfos.part7.7_7.pdf.jpg
TensorFlowOnSpark
的Spark
应用程序包括4
个基本过程。
- Reserve:组建
TensorFlow
集群,并在每个Executor
进程上预留监听端口,启动“数据/控制”消息的监听程序。 - Start:在每个
Executor
进程上启动TensorFlow
应用程序; - Train/Inference:在
TensorFlow
集群上完成模型的训练或推理 - Shutdown:关闭
Executor
进程上的TensorFlow
应用程序,释放相应的系统资源(消息队列)。
tfos.part8.8_8.pdf.jpg
用户直接通过spark-submit
的方式提交Spark
应用程序(mnist_spark.py
)。其中通过--py_files
选项附带TensorFlowOnSpark
框架(tfspark.zip
),及其TensorFlow
应用程序(mnist_dist.py
),从而实现TensorFlow
集群在Spark
平台上的部署。
tfos.part9.9_9.pdf.jpg
首先看看TensorFlow
集群的建立过程。首先根据spark-submit
传递的num_executor
参数,通过调用cluster = sc.parallelize(num_executor)
建立一个ParllelCollectionRDD
,其中分区数为num_executor
。也就是说,此时分区数等于Executor
数。
然后再调用cluster.mapPartitions(TFParkNode.reserve)
将ParllelCollectionRDD
变换(transformation)为MapPartitionsRDD
,在每个分区上回调TRSparkNode.reserve
。
TRSparkNode.reserve
将会在该节点上预留一个端口,并驻留一个Manager
服务。Manager持有一个队列,用于完成进程间的同步,实现该节点的“数据/控制”消息的服务。
数据消息启动了两个队列:Input
与Output
,分别用于RDD
与Executor
进程之间的数据交换。
控制消息启动了一个队列:Control
,用于Driver
进程控制PS
任务的生命周期,当模型训练完成之后,通过Driver
发送Stop
的控制消息结束PS
任务。
tfos.part10.10_10.pdf.jpg
这是从分区的角度看待TensorFlow
集群建立的过程,横轴表示RDD
。这里存在两个RDD
,第一个为ParllelCollectionRDD
,然后变换为MapPartitionsRDD
。
纵轴表示同一个分区(Partition),并在每个分区上启动一个Executor
进程 。在Spark
中,分区数等于最终在TaskScheduler
上调度的Task数目。
此处,sc.parallelize(num_executor)
生成一个分区数为num_executor
的ParllelCollectionRDD
。也就是说,此时分区数等于num_executor
数目。
在本例中,num_executor
为3
,包括1
个PS
任务,2
个Worker
任务。
tfos.part11.11_11.pdf.jpg
TensorFlow
集群建立后,将生成上图所示的领域模型。其中,一个TFCluster
将持有num_executor
个TFSparkNode
节点;在每个TFSparkNode
上驻留一个Manager
服务,并预留一个监听端口,用于监听“数据/控制”消息。
实际上,TFSparkNode
节点承载于Spark Executor
进程之上。
tfos.part12.12_12.pdf.jpg
TensorFlow
集群建立后,通过调用cluster.start
启动集群服务。其结果将在每个Executor
进程上启动TensorFlow
应用程序。
此处,需要对原生的TensorFlow
应用程序进行适配修改,包括2
个部分:
-
Feeding
与Fetching
: 数据输入/输出机制修改 -
ClusterSpec
:TF
集群的构造描述
其余代码都将保留,最小化TensorFlow
应用程序的修改。
tfos.part13.13_13.pdf.jpg
在cluster
上调用foreachPartition(TFSparkNode.start(map_func))
,将在每个分区(Executor
进程)上回调TFSparkNode.start(map_func)
。其中,map_func
是对应TF
应用程序的包装。
通过上述过程,将在Spark
上拉起了一个TF
的集群服务。从而使得Spark
集群拥有了深度学习和GPU
加速的能力。
tfos.part14.14_14.pdf.jpg
当Spark
平台上已经拉起了TF
集群服务之后,便可以启动模型的训练或推理过程了。在训练或推理过程中,最重要的是解决数据的Feeding
和Fetching
问题。
TFoS
上提供了两种方案:
-
TensorFlow QueueRunner
:利用TensorFlow
提供的FileReader
和QueueRunner
机制。Spark
未参与任何工作,请查阅TensorFlow
官方相关文档。 -
Spark Feeding
:首先从RDD
读取分区数据(通过HadoopRDD.compute
),然后将其放在Input
队列中,Executor
进程再从该队列中取出,并进一步通过feed_dict
,调用session.run
将分区数据供给给TensorFlow Graph
中。
tfos.part15.15_15.pdf.jpg
Feeding
过程,就是通过Input Queue
同步实现的。当RDD
读取分区数据后,阻塞式地将分区数据put
到Input
队列中;TFGraph
在session.run
获取Next Batch
时,也是阻塞式地等待数据的到来。
tfos.part16.16_16.pdf.jpg
同样的道理,Fetching
过程与Feeding
过程类同,只是使用Output Queue
,并且数据流方向相反。
session.run
返回的数据,通过put
阻塞式地放入Output Queue
,RDD
也是阻塞式地等待数据到来。
tfos.part17.17_17.pdf.jpg
以模型训练过程为例,讲解RDD
的变换过程。此处以Mnist
手写识别为例,左边表示X
,右边表示Y
。分别通过HadoopRDD
读取分区数据,然后通过MapPartititionRDD
变换分区的数据格式;然后通过zip
算子,实现两个RDD
的折叠,生成ZipPartitionsRDD
。
然后,根据Epochs
超级参数的配置,将该RDD
重复执行Epochs
次,最终将结果汇总,生成UnionRDD
。
在此之前,都是Transformation
的过程,最终调用foreachPartition(train)
启动Action
,触发Spark Job
的提交和任务的运行。
tfos.part18.18_18.pdf.jpg
当模型训练或推理完成之后,分别在Input/Control
队列中投掷Stop
(以传递None
实现)消息,当Manager
收到Stop
消息后,停止队列的运行。
最终,Spark
应用程序退出,Executor
进程退出,整个工作流执行结束。
tfos.part19.19_19.pdf.jpg
tfos.part20.20_20.pdf.jpg
推荐资料,强烈推荐直接地源代码阅读;最后欢迎大家关注我的简书。
tfos.part22.22_22.pdf.jpg
开源技术书
https://github.com/horance-liu/tensorflow-internals