spark rdd缓存及缓存清理

首先我们看看官方的定义和用法介绍

RDD Persistence(持久化)

Spark 中一个很重要的能力是将数据persisting持久化(或称为caching缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD 在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。

RDD 可以使用persist()方法或cache()方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

另外,每个持久化的 RDD 可以使用不同的storage level存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个StorageLevel对象 (Scala,Java,Python) 给persist()方法进行设置。cache()方法是使用默认存储级别的快捷设置方法,默认的存储级别是StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍如下:

Storage Level(存储级别)Meaning(含义)

MEMORY_ONLY  将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中. 如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算. 这是默认的级别.

MEMORY_AND_DISK  将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取.

MEMORY_ONLY_SER

(Java and Scala)

将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担.

MEMORY_AND_DISK_SER

(Java and Scala)

类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算.

DISK_ONLY  只在磁盘上缓存 RDD.

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.  与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本.

OFF_HEAP (experimental 实验性) 类似于 MEMORY_ONLY_SER, 但是将数据存储在off-heap memory中. 这需要启用 off-heap 内存.

Note:在 Python 中, stored objects will 总是使用Picklelibrary 来序列化对象, 所以无论你选择序列化级别都没关系. 在 Python 中可用的存储级别有MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY, 和DISK_ONLY_2.

在 shuffle 操作中(例如reduceByKey),即便是用户没有调用persist方法,Spark 也会自动缓存部分中间数据.这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法.

如何选择存储级别 ?

Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择:

如果您的 RDD 适合于默认存储级别 (MEMORY_ONLY), leave them that way.这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行.

如果不是, 试着使用MEMORY_ONLY_SER和selecting a fast serialization library以使对象更加节省空间,但仍然能够快速访问。 (Java和Scala)

不要溢出到磁盘,除非计算您的数据集的函数是昂贵的, 或者它们过滤大量的数据. 否则, 重新计算分区可能与从磁盘读取分区一样快.

如果需要快速故障恢复,请使用复制的存储级别 (e.g. 如果使用Spark来服务来自网络应用程序的请求).All存储级别通过重新计算丢失的数据来提供完整的容错能力,但复制的数据可让您继续在 RDD 上运行任务,而无需等待重新计算一个丢失的分区.

删除数据

Spark 会自动监视每个节点上的缓存使用情况,并使用 least-recently-used(LRU)的方式来丢弃旧数据分区。 如果您想手动删除 RDD 而不是等待它掉出缓存,使用RDD.unpersist()方法。

然后我们摘下要点:

缓存操作是lazy的,只有在active操作时才能触发

“`

final def iterator(split:Partition,context:TaskContext):Iterator[T]={

if(storageLevel!=StorageLevel.NONE){

getOrCompute(split,context)

}else{

computeOrReadCheckpoint(split,context)}

}

“`

该方法返回一个迭代器,迭代可遍历的所有数据。如果没有被缓存,那么就调用第二个方法。如果有缓存,缓存水平就不为空。

如果有缓存进入

“`

private[spark] def getOrCompute(partition:Partition,context:TaskContext):Iterator[T]=

{

val blockId=RDDBlockId(id,partition.index)

varreadCachedBlock=true

sc.env.SparkEnv.get.blockManager.getOrElseUpdate(blockId,storageLevel,elementClassTag,()=>{

readCachedBlock=falsecomputeOrReadCheckpoint(partition,context)

})

match{…}

}

“`

该方法通过RDDid和partition得到一个blockId,并通过blockManager得到该block。

下面进入blockManager的方法中

“`

def getOrElseUpdate[T](blockId:BlockId,

level:StorageLevel,

classTag:ClassTag[T],

makeIterator:()=>Iterator[T]):Either[BlockResult,Iterator[T]]={

get[T](blockId)(classTag)match{

case Some(block)=>returnLeft(block)

case_=>// Need to compute the block.}

block.doPutIterator(blockId,makeIterator,level,classTag,keepReadLock=true)

match{…}

}

如果没有缓存,就会去checkpoint中查找,如果也没有被check过,那么之能够重新的计算了。

这段代码说明了cache()和persist()的区别。

“`

def cache():this.type=persist()

def persist():this.type=persist(StorageLevel.MEMORY_ONLY)

defpersist(newLevel:StorageLevel):this.type={

if(isLocallyCheckpointed){//该RDD之前被checkpoint过,说明RDD已经被缓存过。//我们只需要直接覆盖原来的存储级别即可persist(LocalRDDCheckpointData.transformStorageLevel(newLevel),allowOverride=true)}

else{persist(newLevel,allowOverride=false)}}

privatedefpersist(newLevel:StorageLevel,allowOverride:Boolean):this.type={// 原来的存储级别不为NONE;新存储级别!=原来的存储界别;不允许覆盖if(storageLevel!=StorageLevel.NONE&&newLevel!=storageLevel&&!allowOverride){thrownewUnsupportedOperationException(“Cannot change storage level of an RDD after it was already assigned a level”)}

if(storageLevel==StorageLevel.NONE){// 第一次调用persistsc.cleaner.foreach(_.registerRDDForCleanup(this))// 通过sc来清理注册sc.persistRDD(this)//缓存RDD}storageLevel=newLevel//跟新存储级别

this}

“`

其实cache()内部调用了persist(),而且默认其为Memory_Noly等级。

Spark缓存清理机制:

MetadataCleaner对象中有一个定时器,用于清理下列的元数据信息:

MAP_OUTPUT_TRACKER:Maptask的输出元信息

SPARK_CONTEXT:persistentRdds中的rdd

HTTP_BROADCAST, http广播的元数据

BLOCK_MANAGER:blockmanager中存储的数据

SHUFFLE_BLOCK_MANAGER:shuffle的输出数据

BROADCAST_VARS:Torrent方式广播broadcast的元数据

contextcleaner清理真实数据:

ContextCleaner为RDD、shuffle、broadcast、accumulator、Checkpoint维持了一个弱引用,当相关对象不可达时,就会将对象插入referenceQueue中。有一个单独的线程来处理这个队列中的对象。

RDD:最终从各节点的blockmanager的memoryStore、diskStore中删除RDD数据

shuffle:删除driver中的mapstatuses关于该shuffleId的信息;删除所有节点中关于该shuffleId的所有分区的数据文件和索引文件

broadcast:最终从各节点的blockmanager的memoryStore、diskStore中删除broadcast数据

Checkpoint:清理checkpointDir目录下关于该rddId的文件

举个RDD的例子,说明一下这样做有什么好处?

默认情况下,RDD是不缓存的,即计算完之后,下一次用需要重新计算。如果要避免重新计算的开销,就要将RDD缓存起来,这个道理谁都明白。但是,缓存的RDD什么时候去释放呢?这就用到了上面提到的弱引用。当我们调用persist缓存一个RDD时,会调用registerRDDForCleanup(this),这就是将本身的RDD注册到一个弱引用中。当这个RDD变为不可达时,会自动将该RDD对象插入到referenceQueue中,等到下次GC时就会走doCleanupRDD分支。RDD可能保存在内存或者磁盘中,这样就能保证,不可达的RDD在GC到来时可以释放blockmanager中的RDD真实数据。

再考虑一下,什么时候RDD不可达了呢?为了让出内存供其他地方使用,除了手动unpersist之外,需要有机制定时清理缓存的RDD数据,这就是MetadataCleaner的SPARK_CONTEXT干的事情。它就是定期的清理persistentRdds中过期的数据,其实与unpersist产生的作用是一样的。一旦清理了,那这个缓存的RDD就没有强引用了。

清理机制原文:https://blog.csdn.net/u014033218/article/details/77853323

    原文作者:不能再白
    原文地址: https://www.jianshu.com/p/761fa2ee868e
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞