Spark SQL 源代码分析之 In-Memory Columnar Storage 之 in-memory query

2023-10-11 10:10

本文主要是介绍Spark SQL 源代码分析之 In-Memory Columnar Storage 之 in-memory query,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

    /** Spark SQL源代码分析系列文章*/

    前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的。

    那么基于以上存储结构,我们查询cache在jvm内的数据又是怎样查询的,本文将揭示查询In-Memory Data的方式。

一、引子

本例使用hive console里查询cache后的src表。
select value from src

当我们将src表cache到了内存后,再次查询src,能够通过analyzed运行计划来观察内部调用。

即parse后,会形成InMemoryRelation结点,最后运行物理计划时,会调用InMemoryColumnarTableScan这个结点的方法。

例如以下:

scala> val exe = executePlan(sql("select value from src").queryExecution.analyzed)
14/09/26 10:30:26 INFO parse.ParseDriver: Parsing command: select value from src
14/09/26 10:30:26 INFO parse.ParseDriver: Parse Completed
exe: org.apache.spark.sql.hive.test.TestHive.QueryExecution = 
== Parsed Logical Plan ==
Project [value#5]InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)== Analyzed Logical Plan ==
Project [value#5]InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)== Optimized Logical Plan ==
Project [value#5]InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)== Physical Plan ==
InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)) //查询内存中表的入口Code Generation: false
== RDD ==

二、InMemoryColumnarTableScan

InMemoryColumnarTableScan是Catalyst里的一个叶子结点,包括了要查询的attributes,和InMemoryRelation(封装了我们缓存的In-Columnar Storage数据结构)。
运行叶子节点,出发execute方法对内存数据进行查询。
1、查询时,调用InMemoryRelation,对其封装的内存数据结构的每一个分区进行操作。
2、获取要请求的attributes,如上,查询请求的是src表的value属性。
3、依据目的查询表达式,来获取在相应存储结构中,请求列的index索引。
4、通过ColumnAccessor来对每一个buffer进行訪问,获取相应查询数据,并封装为Row对象返回。

private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute],relation: InMemoryRelation)extends LeafNode {override def output: Seq[Attribute] = attributesoverride def execute() = {relation.cachedColumnBuffers.mapPartitions { iterator =>// Find the ordinals of the requested columns.  If none are requested, use the first.val requestedColumns = if (attributes.isEmpty) {Seq(0)} else {attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) //依据表达式exprId找出相应列的ByteBuffer的索引}iterator.map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//依据索引取得相应请求列的ByteBuffer,并封装为ColumnAccessor。.flatMap { columnAccessors =>val nextRow = new GenericMutableRow(columnAccessors.length) //Row的长度new Iterator[Row] {override def next() = {var i = 0while (i < nextRow.length) {columnAccessors(i).extractTo(nextRow, i) //依据相应index和长度,从byterbuffer里取得值,封装到row里i += 1}nextRow}override def hasNext = columnAccessors.head.hasNext}}}}
}

查询请求的列,例如以下:

scala> exe.optimizedPlan
res93: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [value#5]InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)scala> val relation =  exe.optimizedPlan(1)
relation: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)scala> val request_relation = exe.executedPlan
request_relation: org.apache.spark.sql.execution.SparkPlan = 
InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None))scala> request_relation.output //请求的列,我们请求的仅仅有value列
res95: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)scala> relation.output //默认保存在relation中的全部列
res96: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#4, value#5)scala> val attributes = request_relation.output 
attributes: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)


整个流程非常简洁,关键步骤是第三步。依据ExprId来查找到,请求列的索引
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))

//依据exprId找出相应ID
scala> val attr_index = attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
attr_index: Seq[Int] = ArrayBuffer(1) //找到请求的列value的索引是1, 我们查询就从Index为1的bytebuffer中,请求数据scala> relation.output.foreach(e=>println(e.exprId))
ExprId(4)    //相应<span style="font-family: Arial, Helvetica, sans-serif;">[key#4,value#5]</span>
ExprId(5)scala> request_relation.output.foreach(e=>println(e.exprId))
ExprId(5)

三、ColumnAccessor

ColumnAccessor相应每一种类型,类图例如以下:


最后返回一个新的迭代器:

          new Iterator[Row] {override def next() = {var i = 0while (i < nextRow.length) { //请求列的长度columnAccessors(i).extractTo(nextRow, i)//调用columnType.setField(row, ordinal, extractSingle(buffer))解析bufferi += 1}nextRow//返回解析后的row}override def hasNext = columnAccessors.head.hasNext}

四、总结

    Spark SQL In-Memory Columnar Storage的查询相对来说还是比較简单的,其查询思想主要和存储的数据结构有关。

    即存储时,按每列放到一个bytebuffer,形成一个bytebuffer数组。

    查询时,依据请求列的exprId查找到上述数组的索引,然后使用ColumnAccessor对buffer中字段进行解析,最后封装为Row对象,返回。

——EOF——

原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/39577419

转载于:https://www.cnblogs.com/bhlsheji/p/4266025.html

这篇关于Spark SQL 源代码分析之 In-Memory Columnar Storage 之 in-memory query的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/187308

相关文章

MySQL 筛选条件放 ON后 vs 放 WHERE 后的区别解析

《MySQL筛选条件放ON后vs放WHERE后的区别解析》文章解释了在MySQL中,将筛选条件放在ON和WHERE中的区别,文章通过几个场景说明了ON和WHERE的区别,并总结了ON用于关... 今天我们来讲讲数据库筛选条件放 ON 后和放 WHERE 后的区别。ON 决定如何 "连接" 表,WHERE

mysql_mcp_server部署及应用实践案例

《mysql_mcp_server部署及应用实践案例》文章介绍了在CentOS7.5环境下部署MySQL_mcp_server的步骤,包括服务安装、配置和启动,还提供了一个基于Dify工作流的应用案例... 目录mysql_mcp_server部署及应用案例1. 服务安装1.1. 下载源码1.2. 创建独立

Mysql中RelayLog中继日志的使用

《Mysql中RelayLog中继日志的使用》MySQLRelayLog中继日志是主从复制架构中的核心组件,负责将从主库获取的Binlog事件暂存并应用到从库,本文就来详细的介绍一下RelayLog中... 目录一、什么是 Relay Log(中继日志)二、Relay Log 的工作流程三、Relay Lo

MySQL日志UndoLog的作用

《MySQL日志UndoLog的作用》UndoLog是InnoDB用于事务回滚和MVCC的重要机制,本文主要介绍了MySQL日志UndoLog的作用,文中介绍的非常详细,对大家的学习或者工作具有一定的... 目录一、Undo Log 的作用二、Undo Log 的分类三、Undo Log 的存储四、Undo

Springboot请求和响应相关注解及使用场景分析

《Springboot请求和响应相关注解及使用场景分析》本文介绍了SpringBoot中用于处理HTTP请求和构建HTTP响应的常用注解,包括@RequestMapping、@RequestParam... 目录1. 请求处理注解@RequestMapping@GetMapping, @PostMappin

MySQL游标和触发器的操作流程

《MySQL游标和触发器的操作流程》本文介绍了MySQL中的游标和触发器的使用方法,游标可以对查询结果集进行逐行处理,而触发器则可以在数据表发生更改时自动执行预定义的操作,感兴趣的朋友跟随小编一起看看... 目录游标游标的操作流程1. 定义游标2.打开游标3.利用游标检索数据4.关闭游标例题触发器触发器的基

MySQL查看表的历史SQL的几种实现方法

《MySQL查看表的历史SQL的几种实现方法》:本文主要介绍多种查看MySQL表历史SQL的方法,包括通用查询日志、慢查询日志、performance_schema、binlog、第三方工具等,并... 目录mysql 查看某张表的历史SQL1.查看MySQL通用查询日志(需提前开启)2.查看慢查询日志3.

MySQL底层文件的查看和修改方法

《MySQL底层文件的查看和修改方法》MySQL底层文件分为文本类(可安全查看/修改)和二进制类(禁止手动操作),以下按「查看方法、修改方法、风险管控三部分详细说明,所有操作均以Linux环境为例,需... 目录引言一、mysql 底层文件的查看方法1. 先定位核心文件路径(基础前提)2. 文本类文件(可直

MySQL数据目录迁移的完整过程

《MySQL数据目录迁移的完整过程》文章详细介绍了将MySQL数据目录迁移到新硬盘的整个过程,包括新硬盘挂载、创建新的数据目录、迁移数据(推荐使用两遍rsync方案)、修改MySQL配置文件和重启验证... 目录1,新硬盘挂载(如果有的话)2,创建新的 mysql 数据目录3,迁移 MySQL 数据(推荐两

MySQL字符串转数值的方法全解析

《MySQL字符串转数值的方法全解析》在MySQL开发中,字符串与数值的转换是高频操作,本文从隐式转换原理、显式转换方法、典型场景案例、风险防控四个维度系统梳理,助您精准掌握这一核心技能,需要的朋友可... 目录一、隐式转换:自动但需警惕的&ld编程quo;双刃剑”二、显式转换:三大核心方法详解三、典型场景