本文主要是介绍40 CacheManager,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
第四十课CacheManager
本文的主要内容:
CacheManager详解
Spark是一体化、多元化的框架。可以使得一个团队和技术堆栈来进行项目的开发。对于迭代式的算法特别有用。(例如多步奏的迭代算法在图计算和机器学习中广泛运用)
CacheManager用来管理缓存(缓存不一定在内存中,有可能在磁盘之中)
一、CacheManager分析:
1. CacheManager管理的是缓存,而缓存可以是基于内存的缓存,也可以是基于磁盘的缓存;
2. CacheManager需要通过BlockManager来操作数据;
当Task运行的时候会调用RDD的compute方法进行计算,而compute方法会调用iterator方法
/*** Internal method to this RDD; will read from cache if applicable, or otherwise compute it.* This should ''not'' be called by users directly, but is available for implementors of custom* subclasses of RDD.*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)} else {computeOrReadCheckpoint(split, context)}
}
二、CacheManager源码详解:
1、Cache在工作时候会最大化的保留数据,但是数据不一定绝对完整,因为当前的计算如果需要内存空间的话,那么cache在内存中数据必须让出空间,此时如果在RDD持久化的时候同时指定了可以把数据放在Disk上,那么部分cache的数据就可以从内存转化到磁盘上,否则的话,数据就会丢失!!!
2、具体的CacheManager在获得缓存数据的时候会通过BlockManager来抓到数据。
/*** Get a block from the block manager (either local or remote).*/
def get(blockId: BlockId): Option[BlockResult] = {val local = getLocal(blockId)if (local.isDefined) {logInfo(s"Found block $blockId locally")return local}val remote = getRemote(blockId)if (remote.isDefined) {logInfo(s"Found block $blockId remotely")return remote}None
}
3、如果CacheManager没有通过BlockManager获得缓存内容的话,此时会通过RDD的如下方法来获得数据
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
上述方法首先会查看当前的RDD是否进行了CheckPoint,若进行了的话,就直接读取CheckPoint的数据,否则的话,就必须进行计算。(CheckPoint会使迭代的性能大幅度提高,所以机器学习和图计算特别喜欢进行CheckPoint)计算之后通过PutInBlockManager会吧数据按照StorgeLevel重新缓存起来;
4、计算之后通过PutInBlockManager会把数据按照StorgLevel重新缓存的时候的需要放在内存之中。
二、Cache具体流程图
这篇关于40 CacheManager的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!