SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)

2024-06-22 08:08

本文主要是介绍SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源

前面文章介绍了SparSQL通过Hive操作HBase表。

SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。

在HBase中表如下:

 
  1. create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => 'f3',VERSIONS => 1}
  2.  
  3. put 'lxw1234','lxw1234.com','f1:c1','name1'
  4. put 'lxw1234','lxw1234.com','f1:c2','name2'
  5. put 'lxw1234','lxw1234.com','f2:c1','age1'
  6. put 'lxw1234','lxw1234.com','f2:c2','age2'
  7. put 'lxw1234','lxw1234.com','f3:c1','job1'
  8. put 'lxw1234','lxw1234.com','f3:c2','job2'
  9. put 'lxw1234','lxw1234.com','f3:c3','job3'
  10.  
  11. hbase(main):025:0* scan 'lxw1234'
  12. ROW COLUMN+CELL
  13. lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1
  14. lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2
  15. lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1
  16. lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2
  17. lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1
  18. lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2
  19. lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3
  20.  
  21.  

进入spark-shell

 
  1. sh /usr/local/spark-1.4.0-bin-hadoop2.3/bin/spark-shell --jars /tmp/sparksql-hbase.jar --total-executor-cores 30 --executor-memory 4G --master spark://lxw1234.com:7077

运行以下代码:

 
  1. import sqlContext._
  2.  
  3.  
  4. var hbasetable = sqlContext.read.format("com.lxw1234.sparksql.hbase").options(Map(
  5. "sparksql_table_schema" -> "(row_key string, c1 string, c2 string, c3 string)",
  6. "hbase_table_name" -> "lxw1234",
  7. "hbase_table_schema" -> "(:key , f1:c2 , f2:c2 , f3:c3 )"
  8. )).load()
  9.  
  10. //sparksql_table_schema参数为sparksql中表的定义
  11. //hbase_table_name参数为HBase中表名
  12. //hbase_table_schema参数为HBase表中需要映射到SparkSQL表中的列族和列,这里映射过//去的字段要和sparksql_table_schema中定义的一致,包括顺序。
  13.  
  14.  
  15. scala> hbasetable.printSchema()
  16. root
  17. |-- row_key: string (nullable = false)
  18. |-- c1: string (nullable = false)
  19. |-- c2: string (nullable = false)
  20. |-- c3: string (nullable = false)
  21.  
  22. hbasetable.registerTempTable("lxw1234")
  23.  
  24.  
  25. sqlContext.sql("SELECT * from lxw1234").collect
  26. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2,age2,job3])
  27.  
  28. sqlContext.sql("SELECT row_key,concat(c1,'|',c2,'|',c3) from lxw1234").collect
  29. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2|age2|job3])
  30.  

源码

HBaseRelation.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import java.io.Serializable
  4. import org.apache.hadoop.fs.Path
  5. import org.apache.spark.sql._
  6. import org.apache.spark.sql.sources.TableScan
  7. import scala.collection.immutable.{HashMap, Map}
  8. import org.apache.hadoop.hbase.client.{Result, Scan, HTable, HBaseAdmin}
  9. import org.apache.spark.sql._
  10. import org.apache.spark.rdd.NewHadoopRDD
  11. import org.apache.hadoop.hbase.HBaseConfiguration
  12. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  13. import scala.collection.JavaConversions._
  14. import scala.collection.JavaConverters._
  15. import scala.collection.mutable.ArrayBuffer
  16. import org.apache.spark.sql.types.StructType
  17. import org.apache.spark.sql.types.DataType
  18. import org.apache.spark.sql.types.StructField
  19. import org.apache.spark.sql.types.LongType
  20. import org.apache.spark.sql.types.IntegerType
  21. import org.apache.spark.sql.types.StringType
  22. import org.apache.spark.sql.types.MapType
  23. import org.apache.spark.sql.sources.BaseRelation
  24.  
  25.  
  26. object Resolver extends Serializable {
  27.  
  28. def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {
  29. val cfColArray = hbaseField.fieldName.split(":",-1)
  30. val cfName = cfColArray(0)
  31. val colName = cfColArray(1)
  32. var fieldRs: Any = null
  33. //resolve row key otherwise resolve column
  34. if(cfName=="" && colName=="key") {
  35. fieldRs = resolveRowKey(result, hbaseField.fieldType)
  36. } else {
  37. fieldRs = resolveColumn(result, cfName, colName,hbaseField.fieldType)
  38. }
  39. fieldRs
  40. }
  41.  
  42. def resolveRowKey (result: Result, resultType: String): Any = {
  43. val rowkey = resultType match {
  44. case "string" =>
  45. result.getRow.map(_.toChar).mkString
  46. case "int" =>
  47. result .getRow.map(_.toChar).mkString.toInt
  48. case "long" =>
  49. result.getRow.map(_.toChar).mkString.toLong
  50. }
  51. rowkey
  52. }
  53.  
  54. def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {
  55. val column = resultType match {
  56. case "string" =>
  57. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString
  58. case "int" =>
  59. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toInt
  60. case "long" =>
  61. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toLong
  62. }
  63. column
  64. }
  65. }
  66.  
  67. /**
  68. val hbaseDDL = s"""
  69. |CREATE TEMPORARY TABLE hbase_people
  70. |USING com.shengli.spark.hbase
  71. |OPTIONS (
  72. | sparksql_table_schema '(row_key string, name string, age int, job string)',
  73. | hbase_table_name 'people',
  74. | hbase_table_schema '(:key , profile:name , profile:age , career:job )'
  75. |)""".stripMargin
  76. */
  77. case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{
  78.  
  79. val hbaseTableName = hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))
  80. val hbaseTableSchema = hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))
  81. val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))
  82. val rowRange = hbaseProps.getOrElse("row_range", "->")
  83. //get star row and end row
  84. val range = rowRange.split("->",-1)
  85. val startRowKey = range(0).trim
  86. val endRowKey = range(1).trim
  87.  
  88. val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
  89. val registerTableFields = extractRegisterSchema(registerTableSchema)
  90. val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)
  91.  
  92. val hbaseTableFields = feedTypes(tempFieldRelation)
  93. val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
  94. val queryColumns = getQueryTargetCloumns(hbaseTableFields)
  95.  
  96. def feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) : Array[HBaseSchemaField] = {
  97. val hbaseFields = mapping.map{
  98. case (k,v) =>
  99. val field = k.copy(fieldType=v.fieldType)
  100. field
  101. }
  102. hbaseFields.toArray
  103. }
  104.  
  105. def isRowKey(field: HBaseSchemaField) : Boolean = {
  106. val cfColArray = field.fieldName.split(":",-1)
  107. val cfName = cfColArray(0)
  108. val colName = cfColArray(1)
  109. if(cfName=="" && colName=="key") true else false
  110. }
  111.  
  112. //eg: f1:col1 f1:col2 f1:col3 f2:col1
  113. def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {
  114. var str = ArrayBuffer[String]()
  115. hbaseTableFields.foreach{ field=>
  116. if(!isRowKey(field)) {
  117. str += field.fieldName
  118. }
  119. }
  120. str.mkString(" ")
  121. }
  122. lazy val schema = {
  123. val fields = hbaseTableFields.map{ field=>
  124. val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName
  125. val relatedType = field.fieldType match {
  126. case "string" =>
  127. SchemaType(StringType,nullable = false)
  128. case "int" =>
  129. SchemaType(IntegerType,nullable = false)
  130. case "long" =>
  131. SchemaType(LongType,nullable = false)
  132. }
  133. StructField(name,relatedType.dataType,relatedType.nullable)
  134. }
  135. StructType(fields)
  136. }
  137.  
  138. def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField], registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {
  139. if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")
  140. val rs = externalHBaseTable.zip(registerTable)
  141. rs.toMap
  142. }
  143.  
  144. /**
  145. * spark sql schema will be register
  146. * registerTableSchema '(rowkey string, value string, column_a string)'
  147. */
  148. def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {
  149. val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)
  150. val fieldsArray = fieldsStr.split(",").map(_.trim)
  151. fieldsArray.map{ fildString =>
  152. val splitedField = fildString.split("\\s+", -1)
  153. RegisteredSchemaField(splitedField(0), splitedField(1))
  154. }
  155. }
  156.  
  157. //externalTableSchema '(:key , f1:col1 )'
  158. def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {
  159. val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)
  160. val fieldsArray = fieldsStr.split(",").map(_.trim)
  161. fieldsArray.map(fildString => HBaseSchemaField(fildString,""))
  162. }
  163.  
  164.  
  165.  
  166. // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
  167. lazy val buildScan = {
  168.  
  169. val hbaseConf = HBaseConfiguration.create()
  170. hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
  171. hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns);
  172. hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey);
  173. hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey);
  174.  
  175. val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
  176. hbaseConf,
  177. classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
  178. classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  179. classOf[org.apache.hadoop.hbase.client.Result]
  180. )
  181.  
  182.  
  183. val rs = hbaseRdd.map(tuple => tuple._2).map(result => {
  184. var values = new ArrayBuffer[Any]()
  185. hbaseTableFields.foreach{field=>
  186. values += Resolver.resolve(field,result)
  187. }
  188. Row.fromSeq(values.toSeq)
  189. })
  190. rs
  191. }
  192.  
  193. private case class SchemaType(dataType: DataType, nullable: Boolean)
  194. //
  195. // private def toSqlType(hbaseSchema: Schema): SchemaType = {
  196. // SchemaType(StringType,true)
  197. // }
  198. }

DefaultSource.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import org.apache.spark.sql.sources.RelationProvider
  5.  
  6.  
  7. class DefaultSource extends RelationProvider {
  8. def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
  9. HBaseRelation(parameters)(sqlContext)
  10. }
  11. }

package.scala

 
  1. package com.lxw1234.sparksql
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import scala.collection.immutable.HashMap
  5.  
  6.  
  7.  
  8. package object hbase {
  9.  
  10. abstract class SchemaField extends Serializable
  11.  
  12. case class RegisteredSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  13.  
  14. case class HBaseSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  15.  
  16. case class Parameter(name: String)
  17.  
  18.  
  19. protected val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")
  20. protected val HBASE_TABLE_NAME = Parameter("hbase_table_name")
  21. protected val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")
  22. protected val ROW_RANGE = Parameter("row_range")
  23. /**
  24. * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.
  25. */
  26. implicit class HBaseContext(sqlContext: SQLContext) {
  27. def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {
  28. var params = new HashMap[String, String]
  29. params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)
  30. params += ( HBASE_TABLE_NAME.name -> hbaseTableName)
  31. params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)
  32. //get star row and end row
  33. params += ( ROW_RANGE.name -> rowRange)
  34. sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));
  35. //sqlContext.baseRelationToSchemaRDD(HBaseRelation(params)(sqlContext))
  36. }
  37. }
  38.  
  39. // implicit class HBaseSchemaRDD(schemaRDD: SchemaRDD) {
  40. // def saveIntoTable(tableName: String): Unit = ???
  41. // }
  42. }
  43.  

 

相关配置和说明

  • 本来在SparkSQL中通过外部数据源建表的语法是:

CREATE TEMPORARY TABLE hbasetable

USING com.lxw1234.sparksql.hbase

OPTIONS (

sparksql_table_schema   ‘(row_key string, c1 string, c2 string, c3 string)’,

hbase_table_name   ‘lxw1234′,

hbase_table_schema ‘(:key , f1:c2 , f2:c2 , f3:c3)’

)

在我的Spark1.4中报错,会使用Hive的语法解析器解析这个DDL语句,因为Hive0.13中没有这种语法,因此报错。

是否是因为Spark1.4包的编译了Hive的原因?

  • 上面源码的编译依赖HBase的相关jar包:

hbase-client-0.96.1.1-cdh5.0.0.jar

hbase-common-0.96.1.1-cdh5.0.0.jar

hbase-protocol-0.96.1.1-cdh5.0.0.jar

hbase-server-0.96.1.1-cdh5.0.0.jar

还有HBase的集群信息:

hbase.zookeeper.quorum

hbase.client.scanner.caching

我之前在配置时候已经将这几个jar包和参数加到Spark集群的CLASSPATH中了,可参考 http://lxw1234.com/archives/2015/07/330.htm

  • 此程序是OopsOutOfMemory基于Spark1.2开发的,我只做了很小的修改。

https://github.com/OopsOutOfMemory/spark-sql-hbase

  • 此程序只做学习和测试使用,并未测试性能

这篇关于SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1083726

相关文章

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Redis 中的热点键和数据倾斜示例详解

《Redis中的热点键和数据倾斜示例详解》热点键是指在Redis中被频繁访问的特定键,这些键由于其高访问频率,可能导致Redis服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis中的热... 目录Redis 中的热点键和数据倾斜热点键(Hot Key)定义特点应对策略示例数据倾斜(Data S

如何自定义Nginx JSON日志格式配置

《如何自定义NginxJSON日志格式配置》Nginx作为最流行的Web服务器之一,其灵活的日志配置能力允许我们根据需求定制日志格式,本文将详细介绍如何配置Nginx以JSON格式记录访问日志,这种... 目录前言为什么选择jsON格式日志?配置步骤详解1. 安装Nginx服务2. 自定义JSON日志格式各

Python实现将MySQL中所有表的数据都导出为CSV文件并压缩

《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到... python将mysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个

Android自定义Scrollbar的两种实现方式

《Android自定义Scrollbar的两种实现方式》本文介绍两种实现自定义滚动条的方法,分别通过ItemDecoration方案和独立View方案实现滚动条定制化,文章通过代码示例讲解的非常详细,... 目录方案一:ItemDecoration实现(推荐用于RecyclerView)实现原理完整代码实现

SpringBoot整合jasypt实现重要数据加密

《SpringBoot整合jasypt实现重要数据加密》Jasypt是一个专注于简化Java加密操作的开源工具,:本文主要介绍详细介绍了如何使用jasypt实现重要数据加密,感兴趣的小伙伴可... 目录jasypt简介 jasypt的优点SpringBoot使用jasypt创建mapper接口配置文件加密