分布式空间分析引擎-Simba架构分析与源码阅读之索引部分源码

本文主要是介绍分布式空间分析引擎-Simba架构分析与源码阅读之索引部分源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Simba实现了五种类型的数据索引,并结合Spark的RDD结构构建了two-level index的机制:在数据分区内部构建local index,以及为各个数据分区构建global index。本章会首先介绍simba的two-level机制,然后具体介绍几种类型的two-level index的具体实现,最后会结合上一章的分区器对几种索引进行总结与比较。

two-level index机制

                                    

构建索引有助提高查询效率,Simba构建了two-level index来减少spatial join过程中参与计算的数据量:simba首先通过global index定位到实际参与运算的partitionID,再对于满足条件分区根据内部的localindex定位到参与运算的数据,过滤掉大部分不满足过滤条件的无关数据。其实多级索引的概念并不是simba的首创,phoenix里面就应用了多级索引来加速查询,TSDB-Gaia中geomesa层也应用了多级索引。

Simba首先对转化为RDD[Row]格式的数据表划分partition,然后对于每个partition内的数据建立local index(local index支持Hash index,RTree等),每个partition的数据生成一个由local index和实际的Row类型的数据Array组成的IPartition[Row]:
IPartition[Row](Data: Array[Row], I: Index)

然后各个数据分区的IPartition组成这个table的IndexRDD:

IndexRDD[Row] = RDD[IPartition[Row]]

然后simba对这些IPartition建立global index,global index可以是各个IPartition boundary的数组,也可以是支持多维数据点的KDTree。

HashMap索引

HashMap索引的逻辑比较简单:

val partitionedRDD = HashPartition(dataRDD, numShufflePartitions)val indexed = partitionedRDD.mapPartitions(iter => {val data = iter.toArrayval index = HashMapIndex(data)Array(IPartition(data.map(_._2), index)).iterator}).persist(StorageLevel.MEMORY_AND_DISK_SER)   

首先调用哈希分区器对数据进行分区,然后对每个分区的数据调用HashMapIndex建立local index,将所有分区的data和index放入数组中作为global index。

object HashMapIndex {def apply[T](data: Array[(T, InternalRow)]): HashMapIndex[T] = {val res = new HashMapIndex[T]for (i <- data.indices) {res.index.put(data(i)._1, i)}res}
}

建立local index的逻辑就是对分区内的每一条数据,将其key作为map-key,在数组中index作为map-value放到HashMap中。

QuadTree索引(四叉树索引)

QuadTree索引建立流程如下:

val (partitionedRDD, _, global_qtree) = QuadTreePartitioner(dataRDD, dimension,numShufflePartitions, sampleRate, tranferThreshold)
val indexed = partitionedRDD.mapPartitions { iter =>
val data = iter.toArray
val index: QuadTree =if (data.length > 0) QuadTree(data.map(_._1).zipWithIndex)else nullArray(IPartition(data.map(_._2), index)).iterator}.persist(StorageLevel.MEMORY_AND_DISK_SER)

首先调用QuadTree分区器对数据进行分区,然后对每个分区的数据调用QuadTree方法建立local index,然后在分区器划分出的MBR上构建RTree作为global index。

四叉树索引QuadTree的构建方法与KDTree类似,主要区别在于QuadTree在每次节点分裂的时候会分裂出四个子节点,而KDTree二分出两个子节点。QuadTree构建逻辑如下:

1)统计数据取值范围,建立根节点:

def apply(entries: Array[(Double, Double, Int)],boundary: (Double, Double, Double, Double) = null): QuadTree = {// collect the border of the total entriesval (x_min, y_min, x_max, y_max) =if (boundary == null) entries.aggregate((Double.MaxValue, Double.MaxValue, Double.MinValue, Double.MinValue))((a: (Double, Double, Double, Double), b: (Double, Double, Int)) =>(math.min(a._1, b._1), math.min(a._2, b._2), math.max(a._3, b._1), math.max(a._4, b._2)),(a: (Double, Double, Double, Double), b: (Double, Double, Double, Double)) =>(math.min(a._1, b._1), math.min(a._2, b._2), math.max(a._3, b._3), math.max(a._4, b._4)))else boundaryval root = new QuadTreeNode(x_min, y_min, x_max, y_max, null, entries)val quadTree = new QuadTree(root)quadTree.bulkload()quadTree}

2)递归地从根节点开始四分裂,如果分裂出的某个子节点包含的数据条数大于等于给定的参数MAX_NODES,则继续分裂这个子节点:

case class QuadTreeNode(x_low: Double, y_low: Double, x_high: Double, y_high: Double,var children: Array[QuadTreeNode], var objects: Array[(Double, Double, Int)]){private val center_x = (x_low + x_high) / 2private val center_y = (y_low + y_high) / 2def whichChild(obj: (Double, Double)): Int = {(if (obj._1 > center_x) 1 else 0) + (if (obj._2 > center_y) 2 else 0)}def makeChildren(): Unit = {children = Array(QuadTreeNode(x_low, y_low, center_x, center_y, null, null),QuadTreeNode(center_x, y_low, x_high, center_y, null, null),QuadTreeNode(x_low, center_y, center_x, y_high, null, null),QuadTreeNode(center_x, center_y, x_high, y_high, null, null))}def makeChildren(grouped: Map[Int, Array[(Double, Double, Int)]]): Unit = {children = Array(QuadTreeNode(x_low, y_low, center_x, center_y, null, grouped.getOrElse(0, Array())),QuadTreeNode(center_x, y_low, x_high, center_y, null, grouped.getOrElse(1, Array())),QuadTreeNode(x_low, center_y, center_x, y_high, null, grouped.getOrElse(2, Array())),QuadTreeNode(center_x, center_y, x_high, y_high, null, grouped.getOrElse(3, Array())))}
}
case class QuadTree(root: QuadTreeNode) extends Index with Serializable{val MAX_NODES = 3def bulkload(): QuadTreeNode = this.bulkload(root)private def bulkload(root: QuadTreeNode): QuadTreeNode = {val grouped = root.objects.groupBy(obj => root.whichChild(obj._1, obj._2))root.makeChildren(grouped)for (child <- root.children) {if (child.objects.length >= MAX_NODES) bulkload(child)}root.objects = nullroot}

Treap索引

Treap=Tree+Heap。Treap是一棵二叉排序树,能够通过二分的方式提高数据查询性能,同时又具有堆的性质:两个子节点的值小于根节点的值,从而避免了二叉排序树在极端情况退化为链式的情况。为了同时达到这两种性质,Treap为每个树节点生成一个随机的权重值,从而通过树的旋转基于真实value实现二叉排序树的性质,基于权重值实现堆的性质。Treap索引的实现如下:

val (partitionedRDD, tmp_bounds) = RangePartition.rowPartition(dataRDD, numShufflePartitions)range_bounds = tmp_boundsval indexed = partitionedRDD.mapPartitions(iter => {val data = iter.toArrayval index = Treap(data)Array(IPartition(data.map(_._2), index)).iterator}).persist(StorageLevel.MEMORY_AND_DISK_SER)

首先调用Range分区器对数据进行分区,然后对每个分区的数据建立Treap索引作为local index,然后把分区器划分出的分割数组rangeBounds作为global index,支持通过二分查找快速定位到partition。

Treap索引的构建过程主要是递归地插入和旋转构建过程如下:

private def leftRotate(p: TreapNode[K]): TreapNode[K] = {val t = p.leftp.left = t.rightt.right = pp.update()t.update()t}private def rightRotate(p: TreapNode[K]): TreapNode[K] = {val t = p.rightp.right = t.leftt.left = pp.update()t.update()t}private def insert(p: TreapNode[K], key: K, data: Int): TreapNode[K] = {if (p == null) {new TreapNode(key, Array(data), null, null, Random.nextLong(), 1, 1)} else if (ordering.equiv(key, p.key)) {p.count += 1p.size += 1p.data = p.data :+ datap} else if (ordering.lt(key, p.key)) {p.left = insert(p.left, key, data)if (p.left.rand < p.rand) leftRotate(p)else {p.update()p}} else {p.right = insert(p.right, key, data)if (p.right.rand < p.rand) rightRotate(p)else {p.update()p}}}def insert(key: K, data: Int): Unit = {root = insert(root, key, data)}

TreeMap索引

TreeMap是一种有序的Map结构,基于红黑树实现,simba中TreeMap索引的构建过程如下:

val (partitionedRDD, tmp_bounds) = RangePartition.rowPartition(dataRDD, numShufflePartitions)range_bounds = tmp_boundsval indexed = partitionedRDD.mapPartitions(iter => {val data = iter.toArrayval index = TreeMapIndex(data)Array(IPartition(data.map(_._2), index)).iterator}).persist(StorageLevel.MEMORY_AND_DISK_SER)

首先调用Range分区器对数据进行分区,然后对每个分区的数据建立TreeMap索引作为local index,然后把分区器划分出的分割数组rangeBounds作为global index,支持通过二分查找快速定位到partition。

TreeMap的构建主要是通过jdk的原生API来实现,直接初始化了util包的TreeMapIndex:

object TreeMapIndex {def apply[T](data: Array[(T, InternalRow)]): TreeMapIndex[T] = {val res = new TreeMapIndex[T]for (i <- data.indices)res.index.put(data(i)._1, i)res}
}

RTree索引

RTree是一种经典的空间索引,在Simba中也应用的十分广泛,构建流程如下:

val (partitionedRDD, mbr_bounds) =STRPartition(dataRDD, dimension, numShufflePartitions, sampleRate, transferThreshold, max_entries_per_node)
val indexed = partitionedRDD.mapPartitions { iter =>val data = iter.toArrayvar index: RTree = nullif (data.length > 0) index = RTree(data.map(_._1).zipWithIndex, max_entries_per_node)Array(IPartition(data.map(_._2), index)).iterator}.persist(StorageLevel.MEMORY_AND_DISK_SER)val partitionSize = indexed.mapPartitions(iter => iter.map(_.data.length)).collect()global_rtree = RTree(mbr_bounds.zip(partitionSize).map(x => (x._1._1, x._1._2, x._2)), max_entries_per_node)

首先调用STR分区器对数据进行分区,然后对每个分区的数据建立RTree索引作为local index,在分区器划分出的MBR上构建RTree作为global index。RTree的构建逻辑如下:

1.对原始数据建立RTree:

1.1、对数据递归分组,分组规则参照STR算法,并对最终生成的每个数据分组建立MBR:

def recursiveGroupPoint(entries: Array[(Point, Int)],cur_dim: Int, until_dim: Int): Array[Array[(Point, Int)]] = {val len = entries.length.toDoubleval grouped = entries.sortWith(_._1.coord(cur_dim) < _._1.coord(cur_dim)).grouped(Math.ceil(len / dim(cur_dim)).toInt).toArrayif (cur_dim < until_dim) {grouped.flatMap(now => recursiveGroupPoint(now, cur_dim + 1, until_dim))} else grouped}val grouped = recursiveGroupPoint(entries, 0, dimension - 1)val rtree_nodes = mutable.ArrayBuffer[(MBR, RTreeNode)]()grouped.foreach(list => {val min = new Array[Double](dimension).map(x => Double.MaxValue)val max = new Array[Double](dimension).map(x => Double.MinValue)list.foreach(now => {for (i <- 0 until dimension) min(i) = Math.min(min(i), now._1.coord(i))for (i <- 0 until dimension) max(i) = Math.max(max(i), now._1.coord(i))})val mbr = MBR(new Point(min), new Point(max))rtree_nodes += ((mbr, new RTreeNode(mbr, list)))})

1.2、把1.1生成的节点分组作为叶子层,自底向上递归地对每一层进行分组生成多叉树结构,直至生成根节点只包含一个RTreeNode。

def recursiveGroupRTreeNode(entries: Array[(MBR, RTreeNode)], cur_dim: Int, until_dim: Int): Array[Array[(MBR, RTreeNode)]] = {val len = entries.length.toDoubleval grouped = entries.sortWith(comp(cur_dim)).grouped(Math.ceil(len / dim(cur_dim)).toInt).toArrayif (cur_dim < until_dim) {grouped.flatMap(now => recursiveGroupRTreeNode(now, cur_dim + 1, until_dim))} else grouped}while (!over(dim)) {val grouped = recursiveGroupRTreeNode(cur_rtree_nodes, 0, dimension - 1)var tmp_nodes = mutable.ArrayBuffer[(MBR, RTreeNode)]()grouped.foreach(list => {val min = new Array[Double](dimension).map(x => Double.MaxValue)val max = new Array[Double](dimension).map(x => Double.MinValue)list.foreach(now => {for (i <- 0 until dimension) min(i) = Math.min(min(i), now._1.low.coord(i))for (i <- 0 until dimension) max(i) = Math.max(max(i), now._1.high.coord(i))})val mbr = MBR(new Point(min), new Point(max))tmp_nodes += ((mbr, new RTreeNode(mbr, list)))})cur_rtree_nodes = tmp_nodes.toArraycur_len = cur_rtree_nodes.length.toDoubleremaining = cur_len / max_entries_per_nodefor (i <- 0 until dimension) {dim(i) = Math.ceil(Math.pow(remaining, 1.0 / (dimension - i))).toIntremaining /= dim(i)}}

2.对已划分的MBR建立RTree:

2.1、对输入的MBR进行分组,递归地生成叶子层RTreeNode:

def recursiveGroupMBR(entries: Array[(MBR, Int, Int)], cur_dim: Int, until_dim: Int): Array[Array[(MBR, Int, Int)]] = {val len = entries.length.toDoubleval grouped = entries.sortWith(compMBR(cur_dim)).grouped(Math.ceil(len / dim(cur_dim)).toInt).toArrayif (cur_dim < until_dim) {grouped.flatMap(now => recursiveGroupMBR(now, cur_dim + 1, until_dim))} else grouped}val grouped = recursiveGroupMBR(entries, 0, dimension - 1)val rtree_nodes = mutable.ArrayBuffer[(MBR, RTreeNode)]()grouped.foreach(list => {val min = new Array[Double](dimension).map(x => Double.MaxValue)val max = new Array[Double](dimension).map(x => Double.MinValue)list.foreach(now => {for (i <- 0 until dimension) min(i) = Math.min(min(i), now._1.low.coord(i))for (i <- 0 until dimension) max(i) = Math.max(max(i), now._1.high.coord(i))})val mbr = MBR(new Point(min), new Point(max))rtree_nodes += ((mbr, new RTreeNode(mbr, list)))})

2.2、自底向上递归地对每一层进行分组生成多叉树结构,这部分的逻辑与1.2完全一致。

索引的比较

索引类型

对应分区器

local index

global index

适用数据类型

优势

劣势

HashMap

Range

HashMapIndex

rangeBounds数组

不限

构建效率高

空间占用相对其他索引较大

QuadTree

QuadTree

QuadTree

RTree

二维数据

相对map类索引节省空间,对spatial类算子有优化

普通查询由于需要递归遍历树效率较低。

Treap

Range

TreeMap

rangeBounds数组

不限

堆的性质限制了树高度不会特别失衡

二叉树树深度可能会比较深,影响查询效率

RTree

STR

RTree

RTree

高维数据点

大多数空间算子基于RTree设计,做了特别优化

TreeMap

Range

TreeMap

rangeBounds数组

不限

区间查询效率高

空间占用相对其他索引较大

索引部分源码就介绍到这里,这部分的代码还是值得一看的,可以帮助理解数据库,特别是OLAP数据库的索引到底是怎么样实现的。

这篇关于分布式空间分析引擎-Simba架构分析与源码阅读之索引部分源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/248349

相关文章

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

java如何分布式锁实现和选型

《java如何分布式锁实现和选型》文章介绍了分布式锁的重要性以及在分布式系统中常见的问题和需求,它详细阐述了如何使用分布式锁来确保数据的一致性和系统的高可用性,文章还提供了基于数据库、Redis和Zo... 目录引言:分布式锁的重要性与分布式系统中的常见问题和需求分布式锁的重要性分布式系统中常见的问题和需求

oracle数据库索引失效的问题及解决

《oracle数据库索引失效的问题及解决》本文总结了在Oracle数据库中索引失效的一些常见场景,包括使用isnull、isnotnull、!=、、、函数处理、like前置%查询以及范围索引和等值索引... 目录oracle数据库索引失效问题场景环境索引失效情况及验证结论一结论二结论三结论四结论五总结ora

Golang使用etcd构建分布式锁的示例分享

《Golang使用etcd构建分布式锁的示例分享》在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统,分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要,它有助于维护一致性,防止竞... 目录引言环境准备新建Go项目实现加锁和解锁功能测试分布式锁重构实现失败重试总结引言我们将使用Go作

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

Redis分布式锁使用及说明

《Redis分布式锁使用及说明》本文总结了Redis和Zookeeper在高可用性和高一致性场景下的应用,并详细介绍了Redis的分布式锁实现方式,包括使用Lua脚本和续期机制,最后,提到了RedLo... 目录Redis分布式锁加锁方式怎么会解错锁?举个小案例吧解锁方式续期总结Redis分布式锁如果追求