首先我们看看官方的定义和用法介绍
RDD Persistence(持久化)
Spark 中一个很重要的能力是将数据persisting持久化(或称为caching缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD 在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。
RDD 可以使用persist()方法或cache()方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。
另外,每个持久化的 RDD 可以使用不同的storage level存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个StorageLevel对象 (Scala,Java,Python) 给persist()方法进行设置。cache()方法是使用默认存储级别的快捷设置方法,默认的存储级别是StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍如下:
Storage Level(存储级别)Meaning(含义)
MEMORY_ONLY 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中. 如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算. 这是默认的级别.
MEMORY_AND_DISK 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取.
MEMORY_ONLY_SER
(Java and Scala)
将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担.
MEMORY_AND_DISK_SER
(Java and Scala)
类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算.
DISK_ONLY 只在磁盘上缓存 RDD.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本.
OFF_HEAP (experimental 实验性) 类似于 MEMORY_ONLY_SER, 但是将数据存储在off-heap memory中. 这需要启用 off-heap 内存.
Note:在 Python 中, stored objects will 总是使用Picklelibrary 来序列化对象, 所以无论你选择序列化级别都没关系. 在 Python 中可用的存储级别有MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY, 和DISK_ONLY_2.
在 shuffle 操作中(例如reduceByKey),即便是用户没有调用persist方法,Spark 也会自动缓存部分中间数据.这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法.
如何选择存储级别 ?
Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择:
如果您的 RDD 适合于默认存储级别 (MEMORY_ONLY), leave them that way.这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行.
如果不是, 试着使用MEMORY_ONLY_SER和selecting a fast serialization library以使对象更加节省空间,但仍然能够快速访问。 (Java和Scala)
不要溢出到磁盘,除非计算您的数据集的函数是昂贵的, 或者它们过滤大量的数据. 否则, 重新计算分区可能与从磁盘读取分区一样快.
如果需要快速故障恢复,请使用复制的存储级别 (e.g. 如果使用Spark来服务来自网络应用程序的请求).All存储级别通过重新计算丢失的数据来提供完整的容错能力,但复制的数据可让您继续在 RDD 上运行任务,而无需等待重新计算一个丢失的分区.
删除数据
Spark 会自动监视每个节点上的缓存使用情况,并使用 least-recently-used(LRU)的方式来丢弃旧数据分区。 如果您想手动删除 RDD 而不是等待它掉出缓存,使用RDD.unpersist()方法。
然后我们摘下要点:
缓存操作是lazy的,只有在active操作时才能触发
“`
final def iterator(split:Partition,context:TaskContext):Iterator[T]={
if(storageLevel!=StorageLevel.NONE){
getOrCompute(split,context)
}else{
computeOrReadCheckpoint(split,context)}
}
“`
该方法返回一个迭代器,迭代可遍历的所有数据。如果没有被缓存,那么就调用第二个方法。如果有缓存,缓存水平就不为空。
如果有缓存进入
“`
private[spark] def getOrCompute(partition:Partition,context:TaskContext):Iterator[T]=
{
val blockId=RDDBlockId(id,partition.index)
varreadCachedBlock=true
sc.env.SparkEnv.get.blockManager.getOrElseUpdate(blockId,storageLevel,elementClassTag,()=>{
readCachedBlock=falsecomputeOrReadCheckpoint(partition,context)
})
match{…}
}
“`
该方法通过RDDid和partition得到一个blockId,并通过blockManager得到该block。
下面进入blockManager的方法中
“`
def getOrElseUpdate[T](blockId:BlockId,
level:StorageLevel,
classTag:ClassTag[T],
makeIterator:()=>Iterator[T]):Either[BlockResult,Iterator[T]]={
get[T](blockId)(classTag)match{
case Some(block)=>returnLeft(block)
case_=>// Need to compute the block.}
block.doPutIterator(blockId,makeIterator,level,classTag,keepReadLock=true)
match{…}
}
如果没有缓存,就会去checkpoint中查找,如果也没有被check过,那么之能够重新的计算了。
这段代码说明了cache()和persist()的区别。
“`
def cache():this.type=persist()
def persist():this.type=persist(StorageLevel.MEMORY_ONLY)
defpersist(newLevel:StorageLevel):this.type={
if(isLocallyCheckpointed){//该RDD之前被checkpoint过,说明RDD已经被缓存过。//我们只需要直接覆盖原来的存储级别即可persist(LocalRDDCheckpointData.transformStorageLevel(newLevel),allowOverride=true)}
else{persist(newLevel,allowOverride=false)}}
privatedefpersist(newLevel:StorageLevel,allowOverride:Boolean):this.type={// 原来的存储级别不为NONE;新存储级别!=原来的存储界别;不允许覆盖if(storageLevel!=StorageLevel.NONE&&newLevel!=storageLevel&&!allowOverride){thrownewUnsupportedOperationException(“Cannot change storage level of an RDD after it was already assigned a level”)}
if(storageLevel==StorageLevel.NONE){// 第一次调用persistsc.cleaner.foreach(_.registerRDDForCleanup(this))// 通过sc来清理注册sc.persistRDD(this)//缓存RDD}storageLevel=newLevel//跟新存储级别
this}
“`
其实cache()内部调用了persist(),而且默认其为Memory_Noly等级。
Spark缓存清理机制:
MetadataCleaner对象中有一个定时器,用于清理下列的元数据信息:
MAP_OUTPUT_TRACKER:Maptask的输出元信息
SPARK_CONTEXT:persistentRdds中的rdd
HTTP_BROADCAST, http广播的元数据
BLOCK_MANAGER:blockmanager中存储的数据
SHUFFLE_BLOCK_MANAGER:shuffle的输出数据
BROADCAST_VARS:Torrent方式广播broadcast的元数据
contextcleaner清理真实数据:
ContextCleaner为RDD、shuffle、broadcast、accumulator、Checkpoint维持了一个弱引用,当相关对象不可达时,就会将对象插入referenceQueue中。有一个单独的线程来处理这个队列中的对象。
RDD:最终从各节点的blockmanager的memoryStore、diskStore中删除RDD数据
shuffle:删除driver中的mapstatuses关于该shuffleId的信息;删除所有节点中关于该shuffleId的所有分区的数据文件和索引文件
broadcast:最终从各节点的blockmanager的memoryStore、diskStore中删除broadcast数据
Checkpoint:清理checkpointDir目录下关于该rddId的文件
举个RDD的例子,说明一下这样做有什么好处?
默认情况下,RDD是不缓存的,即计算完之后,下一次用需要重新计算。如果要避免重新计算的开销,就要将RDD缓存起来,这个道理谁都明白。但是,缓存的RDD什么时候去释放呢?这就用到了上面提到的弱引用。当我们调用persist缓存一个RDD时,会调用registerRDDForCleanup(this),这就是将本身的RDD注册到一个弱引用中。当这个RDD变为不可达时,会自动将该RDD对象插入到referenceQueue中,等到下次GC时就会走doCleanupRDD分支。RDD可能保存在内存或者磁盘中,这样就能保证,不可达的RDD在GC到来时可以释放blockmanager中的RDD真实数据。
再考虑一下,什么时候RDD不可达了呢?为了让出内存供其他地方使用,除了手动unpersist之外,需要有机制定时清理缓存的RDD数据,这就是MetadataCleaner的SPARK_CONTEXT干的事情。它就是定期的清理persistentRdds中过期的数据,其实与unpersist产生的作用是一样的。一旦清理了,那这个缓存的RDD就没有强引用了。
清理机制原文:https://blog.csdn.net/u014033218/article/details/77853323