RDD 持久化的工作原理

Spark 中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10倍)。缓存是迭代算法和快速的交互式使用的重要工具。

RDD 可以使用persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

另外,每个持久化的 RDD 可以使用不同的存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个 StorageLevel 对象(ScalaJavaPython)给 persist() 方法进行设置。cache()方法是使用默认存储级别的快捷设置方法,默认的存储级别是 StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍如下 :

  • MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
  • MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会没有序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
  • MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
  • DISK_ONLY : 只在磁盘上缓存 RDD
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
  • OFF_HEAP(实验中): 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。

注意,在 Python 中,缓存的对象总是使用 Pickle 进行序列化,所以在 Python 中不关心你选择的是哪一种序列化级别。python 中的存储级别包括 MEMORY_ONLYMEMORY_ONLY_2MEMORY_AND_DISKMEMORY_AND_DISK_2DISK_ONLYDISK_ONLY_2

shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据。这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist方法。

如何选择存储级别

Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择 :

  • 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行。
  • 如果内存不能全部存储 RDD,那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
  • 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
  • 如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。

删除数据

Spark 自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个 RDD,而不是等待该 RDDSpark自动移除,可以使用 RDD.unpersist()方法。

不使用RDD的持久化

《RDD 持久化的工作原理》 不使用RDD持久化原理.png

    1. 默认情况下,对于大量数据的action操作都是非常耗时的。可能一个操作就耗时1个小时;
    1. 在执行action操作的时候,才会触发之前的操作的执行,因此在执行第一次count操作时,就会从hdfs中读取一亿数据,形成lines RDD;
    1. 第一次count操作之后,我们的确获取到了hdfs文件的行数。但是lines RDD其实会被丢弃掉,数据也会被新的数据丢失;
      所以,如果不用RDD的持久化机制,可能对于相同的RDD的计算需要重复从HDFS源头获取数据进行计算,这样会浪费很多时间成本;

RDD持久化的原理

《RDD 持久化的工作原理》 RDD持久化原理.png

  • 1.Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。
  • 2.巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。
  • 3.要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition
  • 4.cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清楚缓存,那么可以使用unpersist()方法。
  • 5.Spark自己也会在shuffle操作时,进行数据的持久化,比如写入磁盘,主要是为了在节点失败时,避免需要重新计算整个过程。
package com.spark.sunny.core

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
  * <Description> <br>
  *
  * @author Sunny<br>
  * @version 1.0<br>
  * @CreateDate 2018-03-05 12:04 <br>
  * @see com.spark.sunny <br>
  */
object TestStorageLevel {
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "C:\\ProgramFiles\\hadoop-3.0.0")
    val conf = new SparkConf().setMaster("local").setAppName("TestStorageLevel")
    val sc = new SparkContext(conf)

    // 当RDD会被复用的时候通常我们就要使用持久化策略
    // 1. 持久化策略默的是MEMORY_ONLY
    // 2. 如果内存有些吃紧,可以选择MEMORY_ONLY_SER
    // 3. 当我们的数据需要做一些容错,可以选择_2
    // 4. 如果我们的中间结果RDD计算代价比较大, 我们可以选择使用MEMORY_AND_DISK
    var text = sc.textFile("cvbs.log")
    text = text.persist(StorageLevel.MEMORY_AND_DISK)
    val beginTime = System.currentTimeMillis()

    val count = text.count()
    println("cvbs.log count is " + count)
    val endTime = System.currentTimeMillis()
    println("count cost is " + (endTime - beginTime))

    val beginTime2 = System.currentTimeMillis()
    val count2 = text.count()
    println("cvbs.log count2 is " + count2)
    val endTime2 = System.currentTimeMillis()
    println("count2 cost is " + (endTime2 - beginTime2))

    val beginTime3 = System.currentTimeMillis()
    val count3 = text.count()
    println("cvbs.log count3 is " + count3)
    val endTime3 = System.currentTimeMillis()
    println("count3 cost is " + (endTime3 - beginTime3))
  }
}

输出如下:

18/05/28 10:51:56 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/05/28 10:51:56 INFO DAGScheduler: Job 0 finished: count at TestStorageLevel.scala:29, took 5.162783 s
cvbs.log count is 1422994
count cost is 5389
18/05/28 10:51:57 INFO SparkContext: Starting job: count at TestStorageLevel.scala:35
18/05/28 10:51:57 INFO DAGScheduler: Got job 1 (count at TestStorageLevel.scala:35) with 7 output partitions
18/05/28 10:51:57 INFO DAGScheduler: Final stage: ResultStage 1 (count at TestStorageLevel.scala:35)
18/05/28 10:51:57 INFO DAGScheduler: Parents of final stage: List()
18/05/28 10:51:57 INFO DAGScheduler: Job 1 finished: count at TestStorageLevel.scala:35, took 0.264674 s
cvbs.log count2 is 1422994
count2 cost is 281
18/05/28 10:51:57 INFO SparkContext: Starting job: count at TestStorageLevel.scala:41
18/05/28 10:51:57 INFO DAGScheduler: Job 2 finished: count at TestStorageLevel.scala:41, took 0.133485 s
cvbs.log count3 is 1422994
count3 cost is 142
18/05/28 10:51:57 INFO SparkContext: Invoking stop() from shutdown hook
    原文作者:SunnyMore
    原文地址: https://www.jianshu.com/p/282522842d54
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞