SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)

2024-06-22 08:08

本文主要是介绍SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源

前面文章介绍了SparSQL通过Hive操作HBase表。

SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。

在HBase中表如下:

 
  1. create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => 'f3',VERSIONS => 1}
  2.  
  3. put 'lxw1234','lxw1234.com','f1:c1','name1'
  4. put 'lxw1234','lxw1234.com','f1:c2','name2'
  5. put 'lxw1234','lxw1234.com','f2:c1','age1'
  6. put 'lxw1234','lxw1234.com','f2:c2','age2'
  7. put 'lxw1234','lxw1234.com','f3:c1','job1'
  8. put 'lxw1234','lxw1234.com','f3:c2','job2'
  9. put 'lxw1234','lxw1234.com','f3:c3','job3'
  10.  
  11. hbase(main):025:0* scan 'lxw1234'
  12. ROW COLUMN+CELL
  13. lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1
  14. lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2
  15. lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1
  16. lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2
  17. lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1
  18. lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2
  19. lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3
  20.  
  21.  

进入spark-shell

 
  1. sh /usr/local/spark-1.4.0-bin-hadoop2.3/bin/spark-shell --jars /tmp/sparksql-hbase.jar --total-executor-cores 30 --executor-memory 4G --master spark://lxw1234.com:7077

运行以下代码:

 
  1. import sqlContext._
  2.  
  3.  
  4. var hbasetable = sqlContext.read.format("com.lxw1234.sparksql.hbase").options(Map(
  5. "sparksql_table_schema" -> "(row_key string, c1 string, c2 string, c3 string)",
  6. "hbase_table_name" -> "lxw1234",
  7. "hbase_table_schema" -> "(:key , f1:c2 , f2:c2 , f3:c3 )"
  8. )).load()
  9.  
  10. //sparksql_table_schema参数为sparksql中表的定义
  11. //hbase_table_name参数为HBase中表名
  12. //hbase_table_schema参数为HBase表中需要映射到SparkSQL表中的列族和列,这里映射过//去的字段要和sparksql_table_schema中定义的一致,包括顺序。
  13.  
  14.  
  15. scala> hbasetable.printSchema()
  16. root
  17. |-- row_key: string (nullable = false)
  18. |-- c1: string (nullable = false)
  19. |-- c2: string (nullable = false)
  20. |-- c3: string (nullable = false)
  21.  
  22. hbasetable.registerTempTable("lxw1234")
  23.  
  24.  
  25. sqlContext.sql("SELECT * from lxw1234").collect
  26. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2,age2,job3])
  27.  
  28. sqlContext.sql("SELECT row_key,concat(c1,'|',c2,'|',c3) from lxw1234").collect
  29. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2|age2|job3])
  30.  

源码

HBaseRelation.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import java.io.Serializable
  4. import org.apache.hadoop.fs.Path
  5. import org.apache.spark.sql._
  6. import org.apache.spark.sql.sources.TableScan
  7. import scala.collection.immutable.{HashMap, Map}
  8. import org.apache.hadoop.hbase.client.{Result, Scan, HTable, HBaseAdmin}
  9. import org.apache.spark.sql._
  10. import org.apache.spark.rdd.NewHadoopRDD
  11. import org.apache.hadoop.hbase.HBaseConfiguration
  12. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  13. import scala.collection.JavaConversions._
  14. import scala.collection.JavaConverters._
  15. import scala.collection.mutable.ArrayBuffer
  16. import org.apache.spark.sql.types.StructType
  17. import org.apache.spark.sql.types.DataType
  18. import org.apache.spark.sql.types.StructField
  19. import org.apache.spark.sql.types.LongType
  20. import org.apache.spark.sql.types.IntegerType
  21. import org.apache.spark.sql.types.StringType
  22. import org.apache.spark.sql.types.MapType
  23. import org.apache.spark.sql.sources.BaseRelation
  24.  
  25.  
  26. object Resolver extends Serializable {
  27.  
  28. def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {
  29. val cfColArray = hbaseField.fieldName.split(":",-1)
  30. val cfName = cfColArray(0)
  31. val colName = cfColArray(1)
  32. var fieldRs: Any = null
  33. //resolve row key otherwise resolve column
  34. if(cfName=="" && colName=="key") {
  35. fieldRs = resolveRowKey(result, hbaseField.fieldType)
  36. } else {
  37. fieldRs = resolveColumn(result, cfName, colName,hbaseField.fieldType)
  38. }
  39. fieldRs
  40. }
  41.  
  42. def resolveRowKey (result: Result, resultType: String): Any = {
  43. val rowkey = resultType match {
  44. case "string" =>
  45. result.getRow.map(_.toChar).mkString
  46. case "int" =>
  47. result .getRow.map(_.toChar).mkString.toInt
  48. case "long" =>
  49. result.getRow.map(_.toChar).mkString.toLong
  50. }
  51. rowkey
  52. }
  53.  
  54. def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {
  55. val column = resultType match {
  56. case "string" =>
  57. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString
  58. case "int" =>
  59. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toInt
  60. case "long" =>
  61. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toLong
  62. }
  63. column
  64. }
  65. }
  66.  
  67. /**
  68. val hbaseDDL = s"""
  69. |CREATE TEMPORARY TABLE hbase_people
  70. |USING com.shengli.spark.hbase
  71. |OPTIONS (
  72. | sparksql_table_schema '(row_key string, name string, age int, job string)',
  73. | hbase_table_name 'people',
  74. | hbase_table_schema '(:key , profile:name , profile:age , career:job )'
  75. |)""".stripMargin
  76. */
  77. case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{
  78.  
  79. val hbaseTableName = hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))
  80. val hbaseTableSchema = hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))
  81. val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))
  82. val rowRange = hbaseProps.getOrElse("row_range", "->")
  83. //get star row and end row
  84. val range = rowRange.split("->",-1)
  85. val startRowKey = range(0).trim
  86. val endRowKey = range(1).trim
  87.  
  88. val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
  89. val registerTableFields = extractRegisterSchema(registerTableSchema)
  90. val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)
  91.  
  92. val hbaseTableFields = feedTypes(tempFieldRelation)
  93. val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
  94. val queryColumns = getQueryTargetCloumns(hbaseTableFields)
  95.  
  96. def feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) : Array[HBaseSchemaField] = {
  97. val hbaseFields = mapping.map{
  98. case (k,v) =>
  99. val field = k.copy(fieldType=v.fieldType)
  100. field
  101. }
  102. hbaseFields.toArray
  103. }
  104.  
  105. def isRowKey(field: HBaseSchemaField) : Boolean = {
  106. val cfColArray = field.fieldName.split(":",-1)
  107. val cfName = cfColArray(0)
  108. val colName = cfColArray(1)
  109. if(cfName=="" && colName=="key") true else false
  110. }
  111.  
  112. //eg: f1:col1 f1:col2 f1:col3 f2:col1
  113. def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {
  114. var str = ArrayBuffer[String]()
  115. hbaseTableFields.foreach{ field=>
  116. if(!isRowKey(field)) {
  117. str += field.fieldName
  118. }
  119. }
  120. str.mkString(" ")
  121. }
  122. lazy val schema = {
  123. val fields = hbaseTableFields.map{ field=>
  124. val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName
  125. val relatedType = field.fieldType match {
  126. case "string" =>
  127. SchemaType(StringType,nullable = false)
  128. case "int" =>
  129. SchemaType(IntegerType,nullable = false)
  130. case "long" =>
  131. SchemaType(LongType,nullable = false)
  132. }
  133. StructField(name,relatedType.dataType,relatedType.nullable)
  134. }
  135. StructType(fields)
  136. }
  137.  
  138. def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField], registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {
  139. if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")
  140. val rs = externalHBaseTable.zip(registerTable)
  141. rs.toMap
  142. }
  143.  
  144. /**
  145. * spark sql schema will be register
  146. * registerTableSchema '(rowkey string, value string, column_a string)'
  147. */
  148. def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {
  149. val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)
  150. val fieldsArray = fieldsStr.split(",").map(_.trim)
  151. fieldsArray.map{ fildString =>
  152. val splitedField = fildString.split("\\s+", -1)
  153. RegisteredSchemaField(splitedField(0), splitedField(1))
  154. }
  155. }
  156.  
  157. //externalTableSchema '(:key , f1:col1 )'
  158. def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {
  159. val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)
  160. val fieldsArray = fieldsStr.split(",").map(_.trim)
  161. fieldsArray.map(fildString => HBaseSchemaField(fildString,""))
  162. }
  163.  
  164.  
  165.  
  166. // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
  167. lazy val buildScan = {
  168.  
  169. val hbaseConf = HBaseConfiguration.create()
  170. hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
  171. hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns);
  172. hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey);
  173. hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey);
  174.  
  175. val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
  176. hbaseConf,
  177. classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
  178. classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  179. classOf[org.apache.hadoop.hbase.client.Result]
  180. )
  181.  
  182.  
  183. val rs = hbaseRdd.map(tuple => tuple._2).map(result => {
  184. var values = new ArrayBuffer[Any]()
  185. hbaseTableFields.foreach{field=>
  186. values += Resolver.resolve(field,result)
  187. }
  188. Row.fromSeq(values.toSeq)
  189. })
  190. rs
  191. }
  192.  
  193. private case class SchemaType(dataType: DataType, nullable: Boolean)
  194. //
  195. // private def toSqlType(hbaseSchema: Schema): SchemaType = {
  196. // SchemaType(StringType,true)
  197. // }
  198. }

DefaultSource.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import org.apache.spark.sql.sources.RelationProvider
  5.  
  6.  
  7. class DefaultSource extends RelationProvider {
  8. def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
  9. HBaseRelation(parameters)(sqlContext)
  10. }
  11. }

package.scala

 
  1. package com.lxw1234.sparksql
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import scala.collection.immutable.HashMap
  5.  
  6.  
  7.  
  8. package object hbase {
  9.  
  10. abstract class SchemaField extends Serializable
  11.  
  12. case class RegisteredSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  13.  
  14. case class HBaseSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  15.  
  16. case class Parameter(name: String)
  17.  
  18.  
  19. protected val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")
  20. protected val HBASE_TABLE_NAME = Parameter("hbase_table_name")
  21. protected val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")
  22. protected val ROW_RANGE = Parameter("row_range")
  23. /**
  24. * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.
  25. */
  26. implicit class HBaseContext(sqlContext: SQLContext) {
  27. def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {
  28. var params = new HashMap[String, String]
  29. params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)
  30. params += ( HBASE_TABLE_NAME.name -> hbaseTableName)
  31. params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)
  32. //get star row and end row
  33. params += ( ROW_RANGE.name -> rowRange)
  34. sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));
  35. //sqlContext.baseRelationToSchemaRDD(HBaseRelation(params)(sqlContext))
  36. }
  37. }
  38.  
  39. // implicit class HBaseSchemaRDD(schemaRDD: SchemaRDD) {
  40. // def saveIntoTable(tableName: String): Unit = ???
  41. // }
  42. }
  43.  

 

相关配置和说明

  • 本来在SparkSQL中通过外部数据源建表的语法是:

CREATE TEMPORARY TABLE hbasetable

USING com.lxw1234.sparksql.hbase

OPTIONS (

sparksql_table_schema   ‘(row_key string, c1 string, c2 string, c3 string)’,

hbase_table_name   ‘lxw1234′,

hbase_table_schema ‘(:key , f1:c2 , f2:c2 , f3:c3)’

)

在我的Spark1.4中报错,会使用Hive的语法解析器解析这个DDL语句,因为Hive0.13中没有这种语法,因此报错。

是否是因为Spark1.4包的编译了Hive的原因?

  • 上面源码的编译依赖HBase的相关jar包:

hbase-client-0.96.1.1-cdh5.0.0.jar

hbase-common-0.96.1.1-cdh5.0.0.jar

hbase-protocol-0.96.1.1-cdh5.0.0.jar

hbase-server-0.96.1.1-cdh5.0.0.jar

还有HBase的集群信息:

hbase.zookeeper.quorum

hbase.client.scanner.caching

我之前在配置时候已经将这几个jar包和参数加到Spark集群的CLASSPATH中了,可参考 http://lxw1234.com/archives/2015/07/330.htm

  • 此程序是OopsOutOfMemory基于Spark1.2开发的,我只做了很小的修改。

https://github.com/OopsOutOfMemory/spark-sql-hbase

  • 此程序只做学习和测试使用,并未测试性能

这篇关于SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1083726

相关文章

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

解析C++11 static_assert及与Boost库的关联从入门到精通

《解析C++11static_assert及与Boost库的关联从入门到精通》static_assert是C++中强大的编译时验证工具,它能够在编译阶段拦截不符合预期的类型或值,增强代码的健壮性,通... 目录一、背景知识:传统断言方法的局限性1.1 assert宏1.2 #error指令1.3 第三方解决

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

Java实现自定义table宽高的示例代码

《Java实现自定义table宽高的示例代码》在桌面应用、管理系统乃至报表工具中,表格(JTable)作为最常用的数据展示组件,不仅承载对数据的增删改查,还需要配合布局与视觉需求,而JavaSwing... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码

一文详解Java Stream的sorted自定义排序

《一文详解JavaStream的sorted自定义排序》Javastream中的sorted方法是用于对流中的元素进行排序的方法,它可以接受一个comparator参数,用于指定排序规则,sorte... 目录一、sorted 操作的基础原理二、自定义排序的实现方式1. Comparator 接口的 Lam

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模