SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)

2024-06-22 08:08

本文主要是介绍SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源

前面文章介绍了SparSQL通过Hive操作HBase表。

SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。

在HBase中表如下:

 
  1. create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => 'f3',VERSIONS => 1}
  2.  
  3. put 'lxw1234','lxw1234.com','f1:c1','name1'
  4. put 'lxw1234','lxw1234.com','f1:c2','name2'
  5. put 'lxw1234','lxw1234.com','f2:c1','age1'
  6. put 'lxw1234','lxw1234.com','f2:c2','age2'
  7. put 'lxw1234','lxw1234.com','f3:c1','job1'
  8. put 'lxw1234','lxw1234.com','f3:c2','job2'
  9. put 'lxw1234','lxw1234.com','f3:c3','job3'
  10.  
  11. hbase(main):025:0* scan 'lxw1234'
  12. ROW COLUMN+CELL
  13. lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1
  14. lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2
  15. lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1
  16. lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2
  17. lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1
  18. lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2
  19. lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3
  20.  
  21.  

进入spark-shell

 
  1. sh /usr/local/spark-1.4.0-bin-hadoop2.3/bin/spark-shell --jars /tmp/sparksql-hbase.jar --total-executor-cores 30 --executor-memory 4G --master spark://lxw1234.com:7077

运行以下代码:

 
  1. import sqlContext._
  2.  
  3.  
  4. var hbasetable = sqlContext.read.format("com.lxw1234.sparksql.hbase").options(Map(
  5. "sparksql_table_schema" -> "(row_key string, c1 string, c2 string, c3 string)",
  6. "hbase_table_name" -> "lxw1234",
  7. "hbase_table_schema" -> "(:key , f1:c2 , f2:c2 , f3:c3 )"
  8. )).load()
  9.  
  10. //sparksql_table_schema参数为sparksql中表的定义
  11. //hbase_table_name参数为HBase中表名
  12. //hbase_table_schema参数为HBase表中需要映射到SparkSQL表中的列族和列,这里映射过//去的字段要和sparksql_table_schema中定义的一致,包括顺序。
  13.  
  14.  
  15. scala> hbasetable.printSchema()
  16. root
  17. |-- row_key: string (nullable = false)
  18. |-- c1: string (nullable = false)
  19. |-- c2: string (nullable = false)
  20. |-- c3: string (nullable = false)
  21.  
  22. hbasetable.registerTempTable("lxw1234")
  23.  
  24.  
  25. sqlContext.sql("SELECT * from lxw1234").collect
  26. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2,age2,job3])
  27.  
  28. sqlContext.sql("SELECT row_key,concat(c1,'|',c2,'|',c3) from lxw1234").collect
  29. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2|age2|job3])
  30.  

源码

HBaseRelation.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import java.io.Serializable
  4. import org.apache.hadoop.fs.Path
  5. import org.apache.spark.sql._
  6. import org.apache.spark.sql.sources.TableScan
  7. import scala.collection.immutable.{HashMap, Map}
  8. import org.apache.hadoop.hbase.client.{Result, Scan, HTable, HBaseAdmin}
  9. import org.apache.spark.sql._
  10. import org.apache.spark.rdd.NewHadoopRDD
  11. import org.apache.hadoop.hbase.HBaseConfiguration
  12. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  13. import scala.collection.JavaConversions._
  14. import scala.collection.JavaConverters._
  15. import scala.collection.mutable.ArrayBuffer
  16. import org.apache.spark.sql.types.StructType
  17. import org.apache.spark.sql.types.DataType
  18. import org.apache.spark.sql.types.StructField
  19. import org.apache.spark.sql.types.LongType
  20. import org.apache.spark.sql.types.IntegerType
  21. import org.apache.spark.sql.types.StringType
  22. import org.apache.spark.sql.types.MapType
  23. import org.apache.spark.sql.sources.BaseRelation
  24.  
  25.  
  26. object Resolver extends Serializable {
  27.  
  28. def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {
  29. val cfColArray = hbaseField.fieldName.split(":",-1)
  30. val cfName = cfColArray(0)
  31. val colName = cfColArray(1)
  32. var fieldRs: Any = null
  33. //resolve row key otherwise resolve column
  34. if(cfName=="" && colName=="key") {
  35. fieldRs = resolveRowKey(result, hbaseField.fieldType)
  36. } else {
  37. fieldRs = resolveColumn(result, cfName, colName,hbaseField.fieldType)
  38. }
  39. fieldRs
  40. }
  41.  
  42. def resolveRowKey (result: Result, resultType: String): Any = {
  43. val rowkey = resultType match {
  44. case "string" =>
  45. result.getRow.map(_.toChar).mkString
  46. case "int" =>
  47. result .getRow.map(_.toChar).mkString.toInt
  48. case "long" =>
  49. result.getRow.map(_.toChar).mkString.toLong
  50. }
  51. rowkey
  52. }
  53.  
  54. def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {
  55. val column = resultType match {
  56. case "string" =>
  57. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString
  58. case "int" =>
  59. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toInt
  60. case "long" =>
  61. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toLong
  62. }
  63. column
  64. }
  65. }
  66.  
  67. /**
  68. val hbaseDDL = s"""
  69. |CREATE TEMPORARY TABLE hbase_people
  70. |USING com.shengli.spark.hbase
  71. |OPTIONS (
  72. | sparksql_table_schema '(row_key string, name string, age int, job string)',
  73. | hbase_table_name 'people',
  74. | hbase_table_schema '(:key , profile:name , profile:age , career:job )'
  75. |)""".stripMargin
  76. */
  77. case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{
  78.  
  79. val hbaseTableName = hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))
  80. val hbaseTableSchema = hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))
  81. val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))
  82. val rowRange = hbaseProps.getOrElse("row_range", "->")
  83. //get star row and end row
  84. val range = rowRange.split("->",-1)
  85. val startRowKey = range(0).trim
  86. val endRowKey = range(1).trim
  87.  
  88. val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
  89. val registerTableFields = extractRegisterSchema(registerTableSchema)
  90. val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)
  91.  
  92. val hbaseTableFields = feedTypes(tempFieldRelation)
  93. val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
  94. val queryColumns = getQueryTargetCloumns(hbaseTableFields)
  95.  
  96. def feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) : Array[HBaseSchemaField] = {
  97. val hbaseFields = mapping.map{
  98. case (k,v) =>
  99. val field = k.copy(fieldType=v.fieldType)
  100. field
  101. }
  102. hbaseFields.toArray
  103. }
  104.  
  105. def isRowKey(field: HBaseSchemaField) : Boolean = {
  106. val cfColArray = field.fieldName.split(":",-1)
  107. val cfName = cfColArray(0)
  108. val colName = cfColArray(1)
  109. if(cfName=="" && colName=="key") true else false
  110. }
  111.  
  112. //eg: f1:col1 f1:col2 f1:col3 f2:col1
  113. def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {
  114. var str = ArrayBuffer[String]()
  115. hbaseTableFields.foreach{ field=>
  116. if(!isRowKey(field)) {
  117. str += field.fieldName
  118. }
  119. }
  120. str.mkString(" ")
  121. }
  122. lazy val schema = {
  123. val fields = hbaseTableFields.map{ field=>
  124. val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName
  125. val relatedType = field.fieldType match {
  126. case "string" =>
  127. SchemaType(StringType,nullable = false)
  128. case "int" =>
  129. SchemaType(IntegerType,nullable = false)
  130. case "long" =>
  131. SchemaType(LongType,nullable = false)
  132. }
  133. StructField(name,relatedType.dataType,relatedType.nullable)
  134. }
  135. StructType(fields)
  136. }
  137.  
  138. def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField], registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {
  139. if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")
  140. val rs = externalHBaseTable.zip(registerTable)
  141. rs.toMap
  142. }
  143.  
  144. /**
  145. * spark sql schema will be register
  146. * registerTableSchema '(rowkey string, value string, column_a string)'
  147. */
  148. def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {
  149. val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)
  150. val fieldsArray = fieldsStr.split(",").map(_.trim)
  151. fieldsArray.map{ fildString =>
  152. val splitedField = fildString.split("\\s+", -1)
  153. RegisteredSchemaField(splitedField(0), splitedField(1))
  154. }
  155. }
  156.  
  157. //externalTableSchema '(:key , f1:col1 )'
  158. def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {
  159. val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)
  160. val fieldsArray = fieldsStr.split(",").map(_.trim)
  161. fieldsArray.map(fildString => HBaseSchemaField(fildString,""))
  162. }
  163.  
  164.  
  165.  
  166. // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
  167. lazy val buildScan = {
  168.  
  169. val hbaseConf = HBaseConfiguration.create()
  170. hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
  171. hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns);
  172. hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey);
  173. hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey);
  174.  
  175. val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
  176. hbaseConf,
  177. classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
  178. classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  179. classOf[org.apache.hadoop.hbase.client.Result]
  180. )
  181.  
  182.  
  183. val rs = hbaseRdd.map(tuple => tuple._2).map(result => {
  184. var values = new ArrayBuffer[Any]()
  185. hbaseTableFields.foreach{field=>
  186. values += Resolver.resolve(field,result)
  187. }
  188. Row.fromSeq(values.toSeq)
  189. })
  190. rs
  191. }
  192.  
  193. private case class SchemaType(dataType: DataType, nullable: Boolean)
  194. //
  195. // private def toSqlType(hbaseSchema: Schema): SchemaType = {
  196. // SchemaType(StringType,true)
  197. // }
  198. }

DefaultSource.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import org.apache.spark.sql.sources.RelationProvider
  5.  
  6.  
  7. class DefaultSource extends RelationProvider {
  8. def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
  9. HBaseRelation(parameters)(sqlContext)
  10. }
  11. }

package.scala

 
  1. package com.lxw1234.sparksql
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import scala.collection.immutable.HashMap
  5.  
  6.  
  7.  
  8. package object hbase {
  9.  
  10. abstract class SchemaField extends Serializable
  11.  
  12. case class RegisteredSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  13.  
  14. case class HBaseSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  15.  
  16. case class Parameter(name: String)
  17.  
  18.  
  19. protected val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")
  20. protected val HBASE_TABLE_NAME = Parameter("hbase_table_name")
  21. protected val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")
  22. protected val ROW_RANGE = Parameter("row_range")
  23. /**
  24. * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.
  25. */
  26. implicit class HBaseContext(sqlContext: SQLContext) {
  27. def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {
  28. var params = new HashMap[String, String]
  29. params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)
  30. params += ( HBASE_TABLE_NAME.name -> hbaseTableName)
  31. params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)
  32. //get star row and end row
  33. params += ( ROW_RANGE.name -> rowRange)
  34. sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));
  35. //sqlContext.baseRelationToSchemaRDD(HBaseRelation(params)(sqlContext))
  36. }
  37. }
  38.  
  39. // implicit class HBaseSchemaRDD(schemaRDD: SchemaRDD) {
  40. // def saveIntoTable(tableName: String): Unit = ???
  41. // }
  42. }
  43.  

 

相关配置和说明

  • 本来在SparkSQL中通过外部数据源建表的语法是:

CREATE TEMPORARY TABLE hbasetable

USING com.lxw1234.sparksql.hbase

OPTIONS (

sparksql_table_schema   ‘(row_key string, c1 string, c2 string, c3 string)’,

hbase_table_name   ‘lxw1234′,

hbase_table_schema ‘(:key , f1:c2 , f2:c2 , f3:c3)’

)

在我的Spark1.4中报错,会使用Hive的语法解析器解析这个DDL语句,因为Hive0.13中没有这种语法,因此报错。

是否是因为Spark1.4包的编译了Hive的原因?

  • 上面源码的编译依赖HBase的相关jar包:

hbase-client-0.96.1.1-cdh5.0.0.jar

hbase-common-0.96.1.1-cdh5.0.0.jar

hbase-protocol-0.96.1.1-cdh5.0.0.jar

hbase-server-0.96.1.1-cdh5.0.0.jar

还有HBase的集群信息:

hbase.zookeeper.quorum

hbase.client.scanner.caching

我之前在配置时候已经将这几个jar包和参数加到Spark集群的CLASSPATH中了,可参考 http://lxw1234.com/archives/2015/07/330.htm

  • 此程序是OopsOutOfMemory基于Spark1.2开发的,我只做了很小的修改。

https://github.com/OopsOutOfMemory/spark-sql-hbase

  • 此程序只做学习和测试使用,并未测试性能

这篇关于SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1083726

相关文章

javaScript在表单提交时获取表单数据的示例代码

《javaScript在表单提交时获取表单数据的示例代码》本文介绍了五种在JavaScript中获取表单数据的方法:使用FormData对象、手动提取表单数据、使用querySelector获取单个字... 方法 1:使用 FormData 对象FormData 是一个方便的内置对象,用于获取表单中的键值

Rust中的BoxT之堆上的数据与递归类型详解

《Rust中的BoxT之堆上的数据与递归类型详解》本文介绍了Rust中的BoxT类型,包括其在堆与栈之间的内存分配,性能优势,以及如何利用BoxT来实现递归类型和处理大小未知类型,通过BoxT,Rus... 目录1. Box<T> 的基础知识1.1 堆与栈的分工1.2 性能优势2.1 递归类型的问题2.2

Python使用Pandas对比两列数据取最大值的五种方法

《Python使用Pandas对比两列数据取最大值的五种方法》本文主要介绍使用Pandas对比两列数据取最大值的五种方法,包括使用max方法、apply方法结合lambda函数、函数、clip方法、w... 目录引言一、使用max方法二、使用apply方法结合lambda函数三、使用np.maximum函数

spring-boot-starter-thymeleaf加载外部html文件方式

《spring-boot-starter-thymeleaf加载外部html文件方式》本文介绍了在SpringMVC中使用Thymeleaf模板引擎加载外部HTML文件的方法,以及在SpringBoo... 目录1.Thymeleaf介绍2.springboot使用thymeleaf2.1.引入spring

C#中读取XML文件的四种常用方法

《C#中读取XML文件的四种常用方法》Xml是Internet环境中跨平台的,依赖于内容的技术,是当前处理结构化文档信息的有力工具,下面我们就来看看C#中读取XML文件的方法都有哪些吧... 目录XML简介格式C#读取XML文件方法使用XmlDocument使用XmlTextReader/XmlTextWr

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

java如何通过Kerberos认证方式连接hive

《java如何通过Kerberos认证方式连接hive》该文主要介绍了如何在数据源管理功能中适配不同数据源(如MySQL、PostgreSQL和Hive),特别是如何在SpringBoot3框架下通过... 目录Java实现Kerberos认证主要方法依赖示例续期连接hive遇到的问题分析解决方式扩展思考总

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言