/** Spark SQL源代码分析系列文章*/
前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的。
那么基于以上存储结构,我们查询cache在jvm内的数据又是怎样查询的,本文将揭示查询In-Memory Data的方式。
一、引子
当我们将src表cache到了内存后,再次查询src,能够通过analyzed运行计划来观察内部调用。
即parse后,会形成InMemoryRelation结点,最后运行物理计划时,会调用InMemoryColumnarTableScan这个结点的方法。
例如以下:
scala> val exe = executePlan(sql("select value from src").queryExecution.analyzed)
14/09/26 10:30:26 INFO parse.ParseDriver: Parsing command: select value from src
14/09/26 10:30:26 INFO parse.ParseDriver: Parse Completed
exe: org.apache.spark.sql.hive.test.TestHive.QueryExecution =
== Parsed Logical Plan ==
Project [value#5]InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)== Analyzed Logical Plan ==
Project [value#5]InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)== Optimized Logical Plan ==
Project [value#5]InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)== Physical Plan ==
InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)) //查询内存中表的入口Code Generation: false
== RDD ==
二、InMemoryColumnarTableScan
private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute],relation: InMemoryRelation)extends LeafNode {override def output: Seq[Attribute] = attributesoverride def execute() = {relation.cachedColumnBuffers.mapPartitions { iterator =>// Find the ordinals of the requested columns. If none are requested, use the first.val requestedColumns = if (attributes.isEmpty) {Seq(0)} else {attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) //依据表达式exprId找出相应列的ByteBuffer的索引}iterator.map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//依据索引取得相应请求列的ByteBuffer,并封装为ColumnAccessor。.flatMap { columnAccessors =>val nextRow = new GenericMutableRow(columnAccessors.length) //Row的长度new Iterator[Row] {override def next() = {var i = 0while (i < nextRow.length) {columnAccessors(i).extractTo(nextRow, i) //依据相应index和长度,从byterbuffer里取得值,封装到row里i += 1}nextRow}override def hasNext = columnAccessors.head.hasNext}}}}
}
查询请求的列,例如以下:
scala> exe.optimizedPlan
res93: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#5]InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)scala> val relation = exe.optimizedPlan(1)
relation: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)scala> val request_relation = exe.executedPlan
request_relation: org.apache.spark.sql.execution.SparkPlan =
InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None))scala> request_relation.output //请求的列,我们请求的仅仅有value列
res95: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)scala> relation.output //默认保存在relation中的全部列
res96: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#4, value#5)scala> val attributes = request_relation.output
attributes: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)
//依据exprId找出相应ID
scala> val attr_index = attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
attr_index: Seq[Int] = ArrayBuffer(1) //找到请求的列value的索引是1, 我们查询就从Index为1的bytebuffer中,请求数据scala> relation.output.foreach(e=>println(e.exprId))
ExprId(4) //相应<span style="font-family: Arial, Helvetica, sans-serif;">[key#4,value#5]</span>
ExprId(5)scala> request_relation.output.foreach(e=>println(e.exprId))
ExprId(5)
三、ColumnAccessor
ColumnAccessor相应每一种类型,类图例如以下:
最后返回一个新的迭代器:
new Iterator[Row] {override def next() = {var i = 0while (i < nextRow.length) { //请求列的长度columnAccessors(i).extractTo(nextRow, i)//调用columnType.setField(row, ordinal, extractSingle(buffer))解析bufferi += 1}nextRow//返回解析后的row}override def hasNext = columnAccessors.head.hasNext}
四、总结
Spark SQL In-Memory Columnar Storage的查询相对来说还是比較简单的,其查询思想主要和存储的数据结构有关。
即存储时,按每列放到一个bytebuffer,形成一个bytebuffer数组。
查询时,依据请求列的exprId查找到上述数组的索引,然后使用ColumnAccessor对buffer中字段进行解析,最后封装为Row对象,返回。
——EOF——
原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/39577419