SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)

2024-06-22 08:08

本文主要是介绍SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源

前面文章介绍了SparSQL通过Hive操作HBase表。

SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。

在HBase中表如下:

 
  1. create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => 'f3',VERSIONS => 1}
  2.  
  3. put 'lxw1234','lxw1234.com','f1:c1','name1'
  4. put 'lxw1234','lxw1234.com','f1:c2','name2'
  5. put 'lxw1234','lxw1234.com','f2:c1','age1'
  6. put 'lxw1234','lxw1234.com','f2:c2','age2'
  7. put 'lxw1234','lxw1234.com','f3:c1','job1'
  8. put 'lxw1234','lxw1234.com','f3:c2','job2'
  9. put 'lxw1234','lxw1234.com','f3:c3','job3'
  10.  
  11. hbase(main):025:0* scan 'lxw1234'
  12. ROW COLUMN+CELL
  13. lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1
  14. lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2
  15. lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1
  16. lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2
  17. lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1
  18. lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2
  19. lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3
  20.  
  21.  

进入spark-shell

 
  1. sh /usr/local/spark-1.4.0-bin-hadoop2.3/bin/spark-shell --jars /tmp/sparksql-hbase.jar --total-executor-cores 30 --executor-memory 4G --master spark://lxw1234.com:7077

运行以下代码:

 
  1. import sqlContext._
  2.  
  3.  
  4. var hbasetable = sqlContext.read.format("com.lxw1234.sparksql.hbase").options(Map(
  5. "sparksql_table_schema" -> "(row_key string, c1 string, c2 string, c3 string)",
  6. "hbase_table_name" -> "lxw1234",
  7. "hbase_table_schema" -> "(:key , f1:c2 , f2:c2 , f3:c3 )"
  8. )).load()
  9.  
  10. //sparksql_table_schema参数为sparksql中表的定义
  11. //hbase_table_name参数为HBase中表名
  12. //hbase_table_schema参数为HBase表中需要映射到SparkSQL表中的列族和列,这里映射过//去的字段要和sparksql_table_schema中定义的一致,包括顺序。
  13.  
  14.  
  15. scala> hbasetable.printSchema()
  16. root
  17. |-- row_key: string (nullable = false)
  18. |-- c1: string (nullable = false)
  19. |-- c2: string (nullable = false)
  20. |-- c3: string (nullable = false)
  21.  
  22. hbasetable.registerTempTable("lxw1234")
  23.  
  24.  
  25. sqlContext.sql("SELECT * from lxw1234").collect
  26. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2,age2,job3])
  27.  
  28. sqlContext.sql("SELECT row_key,concat(c1,'|',c2,'|',c3) from lxw1234").collect
  29. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2|age2|job3])
  30.  

源码

HBaseRelation.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import java.io.Serializable
  4. import org.apache.hadoop.fs.Path
  5. import org.apache.spark.sql._
  6. import org.apache.spark.sql.sources.TableScan
  7. import scala.collection.immutable.{HashMap, Map}
  8. import org.apache.hadoop.hbase.client.{Result, Scan, HTable, HBaseAdmin}
  9. import org.apache.spark.sql._
  10. import org.apache.spark.rdd.NewHadoopRDD
  11. import org.apache.hadoop.hbase.HBaseConfiguration
  12. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  13. import scala.collection.JavaConversions._
  14. import scala.collection.JavaConverters._
  15. import scala.collection.mutable.ArrayBuffer
  16. import org.apache.spark.sql.types.StructType
  17. import org.apache.spark.sql.types.DataType
  18. import org.apache.spark.sql.types.StructField
  19. import org.apache.spark.sql.types.LongType
  20. import org.apache.spark.sql.types.IntegerType
  21. import org.apache.spark.sql.types.StringType
  22. import org.apache.spark.sql.types.MapType
  23. import org.apache.spark.sql.sources.BaseRelation
  24.  
  25.  
  26. object Resolver extends Serializable {
  27.  
  28. def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {
  29. val cfColArray = hbaseField.fieldName.split(":",-1)
  30. val cfName = cfColArray(0)
  31. val colName = cfColArray(1)
  32. var fieldRs: Any = null
  33. //resolve row key otherwise resolve column
  34. if(cfName=="" && colName=="key") {
  35. fieldRs = resolveRowKey(result, hbaseField.fieldType)
  36. } else {
  37. fieldRs = resolveColumn(result, cfName, colName,hbaseField.fieldType)
  38. }
  39. fieldRs
  40. }
  41.  
  42. def resolveRowKey (result: Result, resultType: String): Any = {
  43. val rowkey = resultType match {
  44. case "string" =>
  45. result.getRow.map(_.toChar).mkString
  46. case "int" =>
  47. result .getRow.map(_.toChar).mkString.toInt
  48. case "long" =>
  49. result.getRow.map(_.toChar).mkString.toLong
  50. }
  51. rowkey
  52. }
  53.  
  54. def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {
  55. val column = resultType match {
  56. case "string" =>
  57. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString
  58. case "int" =>
  59. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toInt
  60. case "long" =>
  61. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toLong
  62. }
  63. column
  64. }
  65. }
  66.  
  67. /**
  68. val hbaseDDL = s"""
  69. |CREATE TEMPORARY TABLE hbase_people
  70. |USING com.shengli.spark.hbase
  71. |OPTIONS (
  72. | sparksql_table_schema '(row_key string, name string, age int, job string)',
  73. | hbase_table_name 'people',
  74. | hbase_table_schema '(:key , profile:name , profile:age , career:job )'
  75. |)""".stripMargin
  76. */
  77. case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{
  78.  
  79. val hbaseTableName = hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))
  80. val hbaseTableSchema = hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))
  81. val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))
  82. val rowRange = hbaseProps.getOrElse("row_range", "->")
  83. //get star row and end row
  84. val range = rowRange.split("->",-1)
  85. val startRowKey = range(0).trim
  86. val endRowKey = range(1).trim
  87.  
  88. val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
  89. val registerTableFields = extractRegisterSchema(registerTableSchema)
  90. val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)
  91.  
  92. val hbaseTableFields = feedTypes(tempFieldRelation)
  93. val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
  94. val queryColumns = getQueryTargetCloumns(hbaseTableFields)
  95.  
  96. def feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) : Array[HBaseSchemaField] = {
  97. val hbaseFields = mapping.map{
  98. case (k,v) =>
  99. val field = k.copy(fieldType=v.fieldType)
  100. field
  101. }
  102. hbaseFields.toArray
  103. }
  104.  
  105. def isRowKey(field: HBaseSchemaField) : Boolean = {
  106. val cfColArray = field.fieldName.split(":",-1)
  107. val cfName = cfColArray(0)
  108. val colName = cfColArray(1)
  109. if(cfName=="" && colName=="key") true else false
  110. }
  111.  
  112. //eg: f1:col1 f1:col2 f1:col3 f2:col1
  113. def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {
  114. var str = ArrayBuffer[String]()
  115. hbaseTableFields.foreach{ field=>
  116. if(!isRowKey(field)) {
  117. str += field.fieldName
  118. }
  119. }
  120. str.mkString(" ")
  121. }
  122. lazy val schema = {
  123. val fields = hbaseTableFields.map{ field=>
  124. val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName
  125. val relatedType = field.fieldType match {
  126. case "string" =>
  127. SchemaType(StringType,nullable = false)
  128. case "int" =>
  129. SchemaType(IntegerType,nullable = false)
  130. case "long" =>
  131. SchemaType(LongType,nullable = false)
  132. }
  133. StructField(name,relatedType.dataType,relatedType.nullable)
  134. }
  135. StructType(fields)
  136. }
  137.  
  138. def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField], registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {
  139. if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")
  140. val rs = externalHBaseTable.zip(registerTable)
  141. rs.toMap
  142. }
  143.  
  144. /**
  145. * spark sql schema will be register
  146. * registerTableSchema '(rowkey string, value string, column_a string)'
  147. */
  148. def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {
  149. val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)
  150. val fieldsArray = fieldsStr.split(",").map(_.trim)
  151. fieldsArray.map{ fildString =>
  152. val splitedField = fildString.split("\\s+", -1)
  153. RegisteredSchemaField(splitedField(0), splitedField(1))
  154. }
  155. }
  156.  
  157. //externalTableSchema '(:key , f1:col1 )'
  158. def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {
  159. val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)
  160. val fieldsArray = fieldsStr.split(",").map(_.trim)
  161. fieldsArray.map(fildString => HBaseSchemaField(fildString,""))
  162. }
  163.  
  164.  
  165.  
  166. // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
  167. lazy val buildScan = {
  168.  
  169. val hbaseConf = HBaseConfiguration.create()
  170. hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
  171. hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns);
  172. hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey);
  173. hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey);
  174.  
  175. val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
  176. hbaseConf,
  177. classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
  178. classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  179. classOf[org.apache.hadoop.hbase.client.Result]
  180. )
  181.  
  182.  
  183. val rs = hbaseRdd.map(tuple => tuple._2).map(result => {
  184. var values = new ArrayBuffer[Any]()
  185. hbaseTableFields.foreach{field=>
  186. values += Resolver.resolve(field,result)
  187. }
  188. Row.fromSeq(values.toSeq)
  189. })
  190. rs
  191. }
  192.  
  193. private case class SchemaType(dataType: DataType, nullable: Boolean)
  194. //
  195. // private def toSqlType(hbaseSchema: Schema): SchemaType = {
  196. // SchemaType(StringType,true)
  197. // }
  198. }

DefaultSource.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import org.apache.spark.sql.sources.RelationProvider
  5.  
  6.  
  7. class DefaultSource extends RelationProvider {
  8. def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
  9. HBaseRelation(parameters)(sqlContext)
  10. }
  11. }

package.scala

 
  1. package com.lxw1234.sparksql
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import scala.collection.immutable.HashMap
  5.  
  6.  
  7.  
  8. package object hbase {
  9.  
  10. abstract class SchemaField extends Serializable
  11.  
  12. case class RegisteredSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  13.  
  14. case class HBaseSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  15.  
  16. case class Parameter(name: String)
  17.  
  18.  
  19. protected val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")
  20. protected val HBASE_TABLE_NAME = Parameter("hbase_table_name")
  21. protected val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")
  22. protected val ROW_RANGE = Parameter("row_range")
  23. /**
  24. * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.
  25. */
  26. implicit class HBaseContext(sqlContext: SQLContext) {
  27. def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {
  28. var params = new HashMap[String, String]
  29. params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)
  30. params += ( HBASE_TABLE_NAME.name -> hbaseTableName)
  31. params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)
  32. //get star row and end row
  33. params += ( ROW_RANGE.name -> rowRange)
  34. sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));
  35. //sqlContext.baseRelationToSchemaRDD(HBaseRelation(params)(sqlContext))
  36. }
  37. }
  38.  
  39. // implicit class HBaseSchemaRDD(schemaRDD: SchemaRDD) {
  40. // def saveIntoTable(tableName: String): Unit = ???
  41. // }
  42. }
  43.  

 

相关配置和说明

  • 本来在SparkSQL中通过外部数据源建表的语法是:

CREATE TEMPORARY TABLE hbasetable

USING com.lxw1234.sparksql.hbase

OPTIONS (

sparksql_table_schema   ‘(row_key string, c1 string, c2 string, c3 string)’,

hbase_table_name   ‘lxw1234′,

hbase_table_schema ‘(:key , f1:c2 , f2:c2 , f3:c3)’

)

在我的Spark1.4中报错,会使用Hive的语法解析器解析这个DDL语句,因为Hive0.13中没有这种语法,因此报错。

是否是因为Spark1.4包的编译了Hive的原因?

  • 上面源码的编译依赖HBase的相关jar包:

hbase-client-0.96.1.1-cdh5.0.0.jar

hbase-common-0.96.1.1-cdh5.0.0.jar

hbase-protocol-0.96.1.1-cdh5.0.0.jar

hbase-server-0.96.1.1-cdh5.0.0.jar

还有HBase的集群信息:

hbase.zookeeper.quorum

hbase.client.scanner.caching

我之前在配置时候已经将这几个jar包和参数加到Spark集群的CLASSPATH中了,可参考 http://lxw1234.com/archives/2015/07/330.htm

  • 此程序是OopsOutOfMemory基于Spark1.2开发的,我只做了很小的修改。

https://github.com/OopsOutOfMemory/spark-sql-hbase

  • 此程序只做学习和测试使用,并未测试性能

这篇关于SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1083726

相关文章

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

Python实现将实体类列表数据导出到Excel文件

《Python实现将实体类列表数据导出到Excel文件》在数据处理和报告生成中,将实体类的列表数据导出到Excel文件是一项常见任务,Python提供了多种库来实现这一目标,下面就来跟随小编一起学习一... 目录一、环境准备二、定义实体类三、创建实体类列表四、将实体类列表转换为DataFrame五、导出Da

Python实现数据清洗的18种方法

《Python实现数据清洗的18种方法》本文主要介绍了Python实现数据清洗的18种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录1. 去除字符串两边空格2. 转换数据类型3. 大小写转换4. 移除列表中的重复元素5. 快速统

Python数据处理之导入导出Excel数据方式

《Python数据处理之导入导出Excel数据方式》Python是Excel数据处理的绝佳工具,通过Pandas和Openpyxl等库可以实现数据的导入、导出和自动化处理,从基础的数据读取和清洗到复杂... 目录python导入导出Excel数据开启数据之旅:为什么Python是Excel数据处理的最佳拍档

在Pandas中进行数据重命名的方法示例

《在Pandas中进行数据重命名的方法示例》Pandas作为Python中最流行的数据处理库,提供了强大的数据操作功能,其中数据重命名是常见且基础的操作之一,本文将通过简洁明了的讲解和丰富的代码示例,... 目录一、引言二、Pandas rename方法简介三、列名重命名3.1 使用字典进行列名重命名3.编

Python使用Pandas库将Excel数据叠加生成新DataFrame的操作指南

《Python使用Pandas库将Excel数据叠加生成新DataFrame的操作指南》在日常数据处理工作中,我们经常需要将不同Excel文档中的数据整合到一个新的DataFrame中,以便进行进一步... 目录一、准备工作二、读取Excel文件三、数据叠加四、处理重复数据(可选)五、保存新DataFram