深入理解Spark BlockManager:定义、原理与实践

2024-02-08 13:04

本文主要是介绍深入理解Spark BlockManager:定义、原理与实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

深入理解Spark BlockManager:定义、原理与实践

1.定义

Spark是一个开源的大数据处理框架,其主要特点是高性能、易用性以及可扩展性。在Spark中,BlockManager是其核心组件之一,它负责管理内存和磁盘上的数据块,并确保这些数据块在集群中的各个节点上可以高效地共享和访问,其中包括存储、复制、序列化和反序列化数据块,并且负责将这些数据块分发到集群中的各个节点上,以便进行计算。BlockManager还处理数据块的缓存和回收,以及故障恢复和数据迁移等任务。

因为Spark是分布式的计算引擎,因此BlockManager也是一个分布式组件,各个节点(Executor)上都有一个BlockManger实例,管理着当前Executor的数据及元数据进行处理及维护,比如我们常说的block块的增删改的操作,都会在BlockManager上做相应的元素局的变更。而Executor上的BlockManager实例是由Driver端上的BlockManagerMaster统一管理,其关系类似于我们常说的NameNode和DataNode之间的关系。我们知道Spark本身有很多的模块,比如Scheduler调度模块,Standalone资源管理模块等,而BlockManager就是其中非常重要的模块,其源码量也是非常的巨大。总而言之,spark BlockManager是负责Spark上所有的数据的存储与管理的一个极其重要的组件。

2.原理分析

2.1 数据块的管理

在Spark中,每个数据块都有唯一的标识符,称为BlockId。BlockManager通过维护数据块的元数据来管理这些数据块,包括数据块的类型、大小、版本号、所在节点等信息。当一个节点需要访问一个数据块时,它会向BlockManager发送请求,BlockManager根据数据块的标识符和元数据来定位数据块所在的节点,并返回数据块的引用。

sealed abstract class BlockId {// 全局唯一的block的名字def name: String// convenience methodsdef asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None// 一下判断不同类型的Block,可能是RDD、Shuffle、Broadcast之一def isRDD: Boolean = isInstanceOf[RDDBlockId]def isShuffle: Boolean = {(isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId] ||isInstanceOf[ShuffleDataBlockId] || isInstanceOf[ShuffleIndexBlockId])}def isShuffleChunk: Boolean = isInstanceOf[ShuffleBlockChunkId]def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]override def toString: String = name
}

2.2 数据块的存储

我们知道,Spark中的数据块可以存在于内存或磁盘中,对于小数据块,BlockManager会优先将其存储在内存中,以提高访问速度;对于大数据块,则会将其存储在磁盘上。BlockManager还支持将数据块存储在外部存储系统中,如HDFS、S3等。

class StorageLevel private(// 磁盘
private var _useDisk: Boolean,
// 内存
private var _useMemory: Boolean,
// 堆外内存
private var _useOffHeap: Boolean,
// 是否序列化
private var _deserialized: Boolean,
// block默认副本
private var _replication: Int = 1)
extends Externalizable

2.3 数据块的复制

为了保证数据块的可靠性和高可用性,BlockManager会自动将一些数据块复制到其他节点上,以免数据丢失或节点故障导致数据无法访问。复制策略可以根据具体需求进行配置,例如可以设置副本数、复制间隔、复制位置等。

2.4 数据块的序列化和反序列化

在Spark中,数据块经常需要在不同的节点之间传输和共享,因此需要进行序列化和反序列化。BlockManager提供了常用的序列化和反序列化方式,包括Java序列化、Kryo序列化等。

图片

2.5 数据块的缓存和回收

为了提高计算效率,BlockManager还支持将一些常用的数据块缓存在内存中,以避免频繁地从磁盘或外部存储系统中读取数据块。同时,BlockManager还会定期清除一些不再使用的数据块,以释放资源。

2.6 故障恢复和数据迁移

当一个节点出现故障或者网络出现问题时,BlockManager会自动进行故障恢复,将丢失的数据块重新复制到其他节点上。此外,在集群扩容或缩容时,BlockManager还支持数据迁移,以保证数据块的平衡分布。 

2.7 运行原理图

图片

3.代码解读

Spark的BlockManager主要由以下两个类实现:

BlockManagerMaster:负责管理集群中所有节点的BlockManager,并协调各个节点之间的数据块复制和迁移等操作。

BlockManager:负责管理本地节点的数据块,包括数据块的存储、缓存、序列化和反序列化等操作。

接下来,我们重点分析BlockManager,BlockManager的代码主要位于Spark的存储模块中。以下是BlockManager的主要代码结构:

  • BlockManagerMaster:这是BlockManager的主节点,它负责管理所有的数据块。BlockManagerMaster会与每个工作节点上的BlockManager进行通信,了解每个数据块的位置和状态。

  • BlockManagerWorker:这是BlockManager的工作节点,它负责管理本地的数据块。BlockManagerWorker会与BlockManagerMaster进行通信,报告本地数据块的状态。

  • BlockInfo:这是表示一个数据块的信息,包括数据块的大小、位置、副本数等。

  • BlockManager:这是实际执行数据块管理操作的类,它提供了读取、写入、删除数据块的方法。

下面是BlockManager的关键代码解析:


class BlockManager(executorId: String,rpcEnv: RpcEnv,val master: BlockManagerMaster,val defaultSerializer: Serializer,val conf: SparkConf,memoryManager: MemoryManager,mapOutputTracker: MapOutputTracker,shuffleManager: ShuffleManager,blockTransferService: BlockTransferService,securityManager: SecurityManager,numUsableCores: Int)
extends BlockDataManager with Logging {// 存储所有的Blockprivate val blocks = new ConcurrentHashMap[BlockId, BlockInfo]// 存储所有正在读取中的Blockprivate val activeReads = new ConcurrentHashMap[BlockId, BlockFetchingState]// 存储所有正在写入中的Blockprivate val activeWrites = new ConcurrentHashMap[BlockId, BlockOutputStream]// 存储所有已经删除的Blockprivate val deadBlocks = new ConcurrentHashMap[BlockId, Long]// 存储所有已经接收到的Blockprivate val receivedBlockTracker = new ReceivedBlockTracker// 存储所有已经丢失的Blockprivate val blockReplicationPolicy = BlockManager.getReplicationPolicy(conf, master)private val blockTracker = new BlockTracker(blockReplicationPolicy)private val lostBlocks = new ConcurrentHashMap[BlockId, ArrayBuffer[BlockManagerId]]// 存储所有已经被缓存的Blockprivate val cachedBlocks = new ConcurrentHashMap[BlockId, CachedBlock]// BlockManager的内存管理器private val memoryStore =new MemoryStore(conf, memoryManager, this, blockInfoManager)// BlockManager的磁盘管理器private val diskStore = new DiskStore(conf, this, diskBlockManager)// BlockManager的块传输服务private val blockTransferService =new NettyBlockTransferService(conf, securityManager, numUsableCores)// BlockManager的块上传服务private val blockUploadHandler = new BlockUploadHandler(this)// BlockManager的块下载服务private val blockDownloader = new BlockDownloader(blockTransferService, this)// BlockManager的安全管理器private val blockTransferServiceServer =blockTransferService.initServer(rpcEnv, blockUploadHandler, blockDownloader)// BlockManager的Shuffle管理器private val shuffleBlockResolver = new ShuffleBlockResolver(conf)// BlockManager的Shuffle上传服务private val shuffleUploadHandler = new ShuffleUploadHandler(this, shuffleBlockResolver)// BlockManager的Shuffle下载服务private val shuffleDownloader = new ShuffleDownloader(blockTransferService, this)// BlockManager的Shuffle管理器private val shuffleServerId = SparkEnv.get.blockManager.blockManagerId.shuffleServerId// BlockManager的Shuffle服务private val shuffleService =new NettyShuffleService(shuffleServerId, conf, securityManager, shuffleUploadHandler,shuffleDownloader)// BlockManager的Metricsprivate val metricsSystem = SparkEnv.get.metricsSystemprivate val numBlocksRegistered = metricsSystem.counter("blocks.registered")private val numBlocksRemoved = metricsSystem.counter("blocks.removed")// 启动BlockManager的各个服务blockTransferService.init(clientMode = false)blockTransferServiceServer.start()shuffleService.start()// BlockManager的IDval blockManagerId = BlockManagerId(executorId, blockTransferService.hostName, rpcEnv.address.port)

代码中,BlockManager主要包括以下几个部分:

  • 存储结构:使用ConcurrentHashMap存储所有的Block、正在读取中的Block、正在写入中的Block、已经删除的Block、已经接收到的Block、已经缓存的Block以及已经丢失的Block等信息。

  • 内存管理器和磁盘管理器:内存管理器负责将小的数据块存储在内存中,而磁盘管理器则负责将大的数据块存储在磁盘上。

  • 块传输服务:负责处理节点之间的数据块传输,例如上传、下载和复制等操作。

  • Shuffle管理器:负责处理Spark的Shuffle操作,包括Shuffle数据的存储和传输等。

Metrics:用于收集BlockManager的各种指标,如已注册的Block数、已删除的Block数等。

4.案例分析

下面以WordCount为例,演示BlockManager在Spark中的使用过程:

val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)val lines = sc.textFile("data.txt")
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)wordCounts.collect().foreach(println)

在这个例子中,首先调用textFile方法读取文本文件,并将其划分为多个Block。然后,使用flatMap和map方法对每个Block中的文本进行处理,最后使用reduceByKey方法将相同的单词进行合并。在这个过程中,BlockManager扮演着重要的角色,它负责管理所有的Block,并确保它们可以高效地共享和访问。

这篇关于深入理解Spark BlockManager:定义、原理与实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/691121

相关文章

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

认识、理解、分类——acm之搜索

普通搜索方法有两种:1、广度优先搜索;2、深度优先搜索; 更多搜索方法: 3、双向广度优先搜索; 4、启发式搜索(包括A*算法等); 搜索通常会用到的知识点:状态压缩(位压缩,利用hash思想压缩)。

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

【C++高阶】C++类型转换全攻略:深入理解并高效应用

📝个人主页🌹:Eternity._ ⏩收录专栏⏪:C++ “ 登神长阶 ” 🤡往期回顾🤡:C++ 智能指针 🌹🌹期待您的关注 🌹🌹 ❀C++的类型转换 📒1. C语言中的类型转换📚2. C++强制类型转换⛰️static_cast🌞reinterpret_cast⭐const_cast🍁dynamic_cast 📜3. C++强制类型转换的原因📝

深入手撕链表

链表 分类概念单链表增尾插头插插入 删尾删头删删除 查完整实现带头不带头 双向链表初始化增尾插头插插入 删查完整代码 数组 分类 #mermaid-svg-qKD178fTiiaYeKjl {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-

hdu4407容斥原理

题意: 有一个元素为 1~n 的数列{An},有2种操作(1000次): 1、求某段区间 [a,b] 中与 p 互质的数的和。 2、将数列中某个位置元素的值改变。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.Inpu