性能优化
要想使你的Spark流处理应用能够获得更好地性能,你需要大量的优化工作。在这一节中,我们提供了许多配置和参数来对你的程序进行改进。首先你需要从两个方面出发来考虑优化工作。
- 通过有效的的使用集群资源来降低每个批次的数据处理时间。
- 设置一个合适的批次大小以便程序能够尽量快的处理这些数据。
降低每个批次的处理时间
在Spark中有许多方式可以降低每个批次的数据处理时间,你可以参考Tuning Guide,这部分提到了许多优化要点。
优化数据接收并行度
通过网络方式接收数据(Flame,Kafka等),要求把数据反序列化后存储在Spark中。如果数据接收是一个瓶颈,那么我们就考虑采用多个receiver并行的方式。注意,每一个input DStream都可以创建一个单独的receiver(创建在worker节点上)来接收独立地接收流数据。因此,我们可以创建多个input DStream来同时接收多个数据源上的流数据。比如使用一个Kafka input DStream接收两个topic的数据,你完全可以创建两个input DStream来分别接受两个topic的数据,每一个receiver只负责从一个topic接收数据。这样就是一个并行的方式,来增加数据吞吐量。并且这些DStream可以组合成意义DStream,你定义在这个DStream上面的transformactions可以作用在每一个单独的input DStream上面。
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另一个需要考虑的参数是receiver的blocking interval,通过spark.streaming.blockInterval来定义。对于大多数的receiver来说,会把接受到的数据合并成一个数据块然后存储到Spark的内存中。每一个批次中数据块的数量决定了要使用多少个任务来处理这些数据。每一个receiver的每一个batch上的task数量约等于(batch interval / block interval)。比如,block interval为200ms,那么每2s的批次数据需要使用20个task来处理。如果task的数量太低(比如小于机器的内核数量),那么就不能够使用全部的内核来参与计算,导致效率降低。为了增加每一个batch interval的task数量,你可以缩小block interval。但是,推荐使用最小的block interval是50ms,低于这个值的话接下来启动任务的开销将是一个问题。
对于从多个input streams/receiver接受数据的选择是重分配数据并行度(usinginputStream.repartition(<number of partitions>))。这可以在对数据进行处理之前,指定每一个批次接收到的数据使用task的数量。
优化数据处理并行度
如果任何阶段计算任务的并行度都不够高,那么会直接导致集群资源利用率低下(比如我们集群的计算资源为20个核心,但是每个阶段的并行度都只有不到10个,那么就会导致分配的集群资源利用率低下j)。对于一些分布式reduce操作,比如reduceByKey和reduceByKeyAndWindow,默认的并行度通过spark.default.parallelism来控制。你可以通过改变这个参数值来改变计算的并行度。
数据序列化
数据序列化的开销可以通过设置数据的序列化方式来优化。在Spark Streaming中有两种类型的数据需要序列化。
- Input data – 默认情况下接收器接收到的数据会使用StorageLevel.MEMORY_AND_DISK_SER_2来存储在executors的内存中。数据被序列化为bytes以便节约GC的开销,和生成副本来容忍执行器错误。当然数据首先会保存在内存中,知道内存不足以装下需要参与流计算的数据。显示,序列化是需要开销的,接收器必须反序列化接收到的数据然后再次通过Spaark定义的序列化格式对数据进行序列化。
- Persisted RDDs generated by Streaming Operations – 通过计算得到的RDD会被持久化到内存中。譬如,窗口操作会把数据持久化到内存中,因为这些数据需要计算多次。然后,与Spark Core默认的存储级别StorageLevel.MEMORY_ONLY不同,Streaming使用的默认存储级别为StorageLevel.MEMORY_ONLY_SER,以便减小GC的开销。
在所以情况下,使用Kryo序列化方式可以有效地减小CPU和内存的开销。
在特定的情况下,可能流处理需要保存的数据总量不会太大。直接保存反序列化后的对象并不对产生过大的GC开销。如果你使用秒级的batch interval并且没有任何窗口操作的话,那么你可以通过设置storage level可以禁用以序列化方式持久化数据。这可以降低CPU对于序列化的开销,同时又不会带来太大的GC开销。
任务启动开销
如果每秒运行的任务数量非常高(比如50+每秒),把任务发送到slaves时的开销会非常大,这样很难实现次秒级别的延时。你可以通过以下方式来优化。
- Execution Mode – 以Standalone模式或者coarse-grained Mesos模式可以获得更好地任务运行次数,比起fine-grained Mesos模式。
这种方式可以降低批处理时间100s of milliseconds,从而实现次秒级别的批处理。
合理的批次间隔
为了能够保证Spark Streaming程序能够稳定的运行在集群上,系统应该尽可能快的处理接收到数据。话句话来说,每个批次的数据都应该在它生成后尽可能快的被处理。这可以在Streaming Web UI中进行监控,每个批次的处理时间必须要小于每个批次的间隔时间。
基于流处理的特性来说,运行在固定集群资源上的应用,对于批次间隔的选取会严重影响数据处理效率。举个例子,让我们重新考虑一下前面的例子WordCountNetwork。对于特定的数据速率,系统可能只能够保证每两秒生成一次词频报告,而不是每500毫秒。所以批次间隔就必须设置成这样,以便可以持续的运行。
一个好让你找到适合你程序的批次大小的方式是,使用一个保守的批次间隔时间(5-10s)和一个较低的数据速率来测试你的程序。为了测试系统是否可以保证这个数据速率,你可以检查每一个批次处理的端到端的时间延迟(既可以在Spark driver的log4j日志中找到“Total delay”,也可以使用StreamingListener接口),如果延迟可以维持在一个与批次间隔差不多的水平,那么系统就是稳定的。否则,如果延迟在不断地增加,那么也就意味着系统并不能稳定运行了。一旦你有了一个稳定的配置,那么你可以尝试加快数据速率和降低批次间隔。注意,一个瞬间的延迟增大,可能只是短暂的数据率的增长,随着数据率的下降,延迟会回到一个合理的水平。
内存优化
内存调优和GC策略会在Tuning Guide一节详细讨论,在这里我们只着重说明对于StreamingContext相关的调优参数。
Spark Streaming应用对于内存的需求量很大程度上取决于你使用了什么样的transformaction。如果你对过去10分钟的数据使用了一个窗口操作,那么你的集群就需要足够多的内存来保存着10分钟的数据。或者,比如你使用了updateStateByKey方法来出来大量的key的数据集,那么你使用的内存量肯定就会大。相反如果你只是用了一个简单的map-filter-store操作,那么需要的内存量就会小。
在通常情况下,因为从接收器接收到的数据使用的存储级别是StorageLevel.MEMORY_AND_DISK_SER_2,数据在内存中装不下就是放到硬盘中。这就会降低了流处理应用的性能,因此建议提供足够的内存来提高性能。
内存优化的另一个方面是垃圾回收机制。因为流处理程序需要低延迟,所以不希望JVM的垃圾回收影响程序的执行。
下面展示 一些基于内存和GC策略调优的参数。
- Persistence Level of DStreams – 之前数据序列化这一章中已经提到,接收器接收到的数据会被默认序列化,这可以降低内存使用和GC开销。使用Kyro方式进行序列化可以进一步地降低序列化后数据大大小和内存消耗量。进一步的降低内存使用量可以使用压缩(spark.rdd.compress),但同时会增加CPU的开销。
- Clearing old data – 默认情况下,所有的输入数据和通过DStream transformaction产生的持久化的RDD都会被自动清理。Spark Streaming会根据使用的transformaction来决定何时清理数据。举个例子,如果你使用一个10分钟的窗口,那么程序会保留10分钟的数据,然后自动的清理老数据。当然你可以通过设置streamingContext.remember参数来让数据保留更长的时间。
要点
- 一个DStream会关联一个单独的接收器,为了能够并行运行多个接收器,就要创建多个DStream。一个接收器需要一个executor,占用一个内核。为了确保在接收器占用了内核后,还有足够的内核来进行处理工作,你必须要再内核分配的时候同时考虑到这两部分。接收器以轮询的方式来分配执行器。
- 从流数据源接收到数据后,接收器会创建一个数据块。每个block interval都会生成一个数据块。N个数据块会被生成在一个batch interval内(N=batchInterval/blockInterval)。这些块被块管理器分发到其他执行器的块管理器上。在这之后,运行在driver上的网络接受追踪器会被通知这些块所在的位置,以便进行下一个的数据处理。
- 在batch interval期间,driver会对这些块创建一个RDD。在这个batch interval内生成的快都是这个RDD的分区。每一个分区在Spark上都是一个任务。如果blockInterval== batchinterval,则意味着只有一个分区,并且可能就直接在本地处理了。
- 这些块上的map任务都运行在执行器单元上(一个在接收数据块的位置,另一个在数据块被备份到的位置)。这可以让block并不必关心block interval,除非是非本地调度。较大的block interval会带了更大的block,参数spark.locality.wait,能够让块更可能在本地处理。你需要在这两个参数间找到一种平衡,来能够保证bigger block能够在本地执行。
-除了可以使用batch interval和block interval之外,你还可以通过inputDstream.repartition(n)来定义分区数。这会对RDD中的数据进行随机重组,生成n个数据分区。为了更合理的分区数,你必须付出一个数据重组的代价。RDD的处理,都是作为一个job通过driver的jobscheduler来进行调度的。在一个给定的时间点只有一个job是活动的。所以,如果一个job是执行中的,那么其他job就是排队中。 - 如果你有两个DStream,那么就会形成两个RDD,也就会生成两个Job,然后被一个接一个的调度。为了避免这种情况,你可以对峙两个DStream执行union操作。这保证了两个DStream RDD会产生一个unionRDD,这个unionRDD会当做一个单独的job。然而这对RDDs中的分区并没有任何影响。
- 如果批次处理时间远大于批次间隔,那么接收器的内存会被塞满,并且最终会抛出异常(最可能是BlockNotFoundException)。目前,我们还没有方式来停止这个接收器,只能通过Spark配置spark.streaming.receiver.maxRate来限制接收器的数据接受率。