SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)

2024-06-22 08:08

本文主要是介绍SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源

前面文章介绍了SparSQL通过Hive操作HBase表。

SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。

在HBase中表如下:

 
  1. create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => 'f3',VERSIONS => 1}
  2.  
  3. put 'lxw1234','lxw1234.com','f1:c1','name1'
  4. put 'lxw1234','lxw1234.com','f1:c2','name2'
  5. put 'lxw1234','lxw1234.com','f2:c1','age1'
  6. put 'lxw1234','lxw1234.com','f2:c2','age2'
  7. put 'lxw1234','lxw1234.com','f3:c1','job1'
  8. put 'lxw1234','lxw1234.com','f3:c2','job2'
  9. put 'lxw1234','lxw1234.com','f3:c3','job3'
  10.  
  11. hbase(main):025:0* scan 'lxw1234'
  12. ROW COLUMN+CELL
  13. lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1
  14. lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2
  15. lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1
  16. lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2
  17. lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1
  18. lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2
  19. lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3
  20.  
  21.  

进入spark-shell

 
  1. sh /usr/local/spark-1.4.0-bin-hadoop2.3/bin/spark-shell --jars /tmp/sparksql-hbase.jar --total-executor-cores 30 --executor-memory 4G --master spark://lxw1234.com:7077

运行以下代码:

 
  1. import sqlContext._
  2.  
  3.  
  4. var hbasetable = sqlContext.read.format("com.lxw1234.sparksql.hbase").options(Map(
  5. "sparksql_table_schema" -> "(row_key string, c1 string, c2 string, c3 string)",
  6. "hbase_table_name" -> "lxw1234",
  7. "hbase_table_schema" -> "(:key , f1:c2 , f2:c2 , f3:c3 )"
  8. )).load()
  9.  
  10. //sparksql_table_schema参数为sparksql中表的定义
  11. //hbase_table_name参数为HBase中表名
  12. //hbase_table_schema参数为HBase表中需要映射到SparkSQL表中的列族和列,这里映射过//去的字段要和sparksql_table_schema中定义的一致,包括顺序。
  13.  
  14.  
  15. scala> hbasetable.printSchema()
  16. root
  17. |-- row_key: string (nullable = false)
  18. |-- c1: string (nullable = false)
  19. |-- c2: string (nullable = false)
  20. |-- c3: string (nullable = false)
  21.  
  22. hbasetable.registerTempTable("lxw1234")
  23.  
  24.  
  25. sqlContext.sql("SELECT * from lxw1234").collect
  26. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2,age2,job3])
  27.  
  28. sqlContext.sql("SELECT row_key,concat(c1,'|',c2,'|',c3) from lxw1234").collect
  29. res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2|age2|job3])
  30.  

源码

HBaseRelation.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import java.io.Serializable
  4. import org.apache.hadoop.fs.Path
  5. import org.apache.spark.sql._
  6. import org.apache.spark.sql.sources.TableScan
  7. import scala.collection.immutable.{HashMap, Map}
  8. import org.apache.hadoop.hbase.client.{Result, Scan, HTable, HBaseAdmin}
  9. import org.apache.spark.sql._
  10. import org.apache.spark.rdd.NewHadoopRDD
  11. import org.apache.hadoop.hbase.HBaseConfiguration
  12. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  13. import scala.collection.JavaConversions._
  14. import scala.collection.JavaConverters._
  15. import scala.collection.mutable.ArrayBuffer
  16. import org.apache.spark.sql.types.StructType
  17. import org.apache.spark.sql.types.DataType
  18. import org.apache.spark.sql.types.StructField
  19. import org.apache.spark.sql.types.LongType
  20. import org.apache.spark.sql.types.IntegerType
  21. import org.apache.spark.sql.types.StringType
  22. import org.apache.spark.sql.types.MapType
  23. import org.apache.spark.sql.sources.BaseRelation
  24.  
  25.  
  26. object Resolver extends Serializable {
  27.  
  28. def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {
  29. val cfColArray = hbaseField.fieldName.split(":",-1)
  30. val cfName = cfColArray(0)
  31. val colName = cfColArray(1)
  32. var fieldRs: Any = null
  33. //resolve row key otherwise resolve column
  34. if(cfName=="" && colName=="key") {
  35. fieldRs = resolveRowKey(result, hbaseField.fieldType)
  36. } else {
  37. fieldRs = resolveColumn(result, cfName, colName,hbaseField.fieldType)
  38. }
  39. fieldRs
  40. }
  41.  
  42. def resolveRowKey (result: Result, resultType: String): Any = {
  43. val rowkey = resultType match {
  44. case "string" =>
  45. result.getRow.map(_.toChar).mkString
  46. case "int" =>
  47. result .getRow.map(_.toChar).mkString.toInt
  48. case "long" =>
  49. result.getRow.map(_.toChar).mkString.toLong
  50. }
  51. rowkey
  52. }
  53.  
  54. def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {
  55. val column = resultType match {
  56. case "string" =>
  57. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString
  58. case "int" =>
  59. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toInt
  60. case "long" =>
  61. result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toLong
  62. }
  63. column
  64. }
  65. }
  66.  
  67. /**
  68. val hbaseDDL = s"""
  69. |CREATE TEMPORARY TABLE hbase_people
  70. |USING com.shengli.spark.hbase
  71. |OPTIONS (
  72. | sparksql_table_schema '(row_key string, name string, age int, job string)',
  73. | hbase_table_name 'people',
  74. | hbase_table_schema '(:key , profile:name , profile:age , career:job )'
  75. |)""".stripMargin
  76. */
  77. case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{
  78.  
  79. val hbaseTableName = hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))
  80. val hbaseTableSchema = hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))
  81. val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))
  82. val rowRange = hbaseProps.getOrElse("row_range", "->")
  83. //get star row and end row
  84. val range = rowRange.split("->",-1)
  85. val startRowKey = range(0).trim
  86. val endRowKey = range(1).trim
  87.  
  88. val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
  89. val registerTableFields = extractRegisterSchema(registerTableSchema)
  90. val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)
  91.  
  92. val hbaseTableFields = feedTypes(tempFieldRelation)
  93. val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
  94. val queryColumns = getQueryTargetCloumns(hbaseTableFields)
  95.  
  96. def feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) : Array[HBaseSchemaField] = {
  97. val hbaseFields = mapping.map{
  98. case (k,v) =>
  99. val field = k.copy(fieldType=v.fieldType)
  100. field
  101. }
  102. hbaseFields.toArray
  103. }
  104.  
  105. def isRowKey(field: HBaseSchemaField) : Boolean = {
  106. val cfColArray = field.fieldName.split(":",-1)
  107. val cfName = cfColArray(0)
  108. val colName = cfColArray(1)
  109. if(cfName=="" && colName=="key") true else false
  110. }
  111.  
  112. //eg: f1:col1 f1:col2 f1:col3 f2:col1
  113. def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {
  114. var str = ArrayBuffer[String]()
  115. hbaseTableFields.foreach{ field=>
  116. if(!isRowKey(field)) {
  117. str += field.fieldName
  118. }
  119. }
  120. str.mkString(" ")
  121. }
  122. lazy val schema = {
  123. val fields = hbaseTableFields.map{ field=>
  124. val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName
  125. val relatedType = field.fieldType match {
  126. case "string" =>
  127. SchemaType(StringType,nullable = false)
  128. case "int" =>
  129. SchemaType(IntegerType,nullable = false)
  130. case "long" =>
  131. SchemaType(LongType,nullable = false)
  132. }
  133. StructField(name,relatedType.dataType,relatedType.nullable)
  134. }
  135. StructType(fields)
  136. }
  137.  
  138. def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField], registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {
  139. if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")
  140. val rs = externalHBaseTable.zip(registerTable)
  141. rs.toMap
  142. }
  143.  
  144. /**
  145. * spark sql schema will be register
  146. * registerTableSchema '(rowkey string, value string, column_a string)'
  147. */
  148. def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {
  149. val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)
  150. val fieldsArray = fieldsStr.split(",").map(_.trim)
  151. fieldsArray.map{ fildString =>
  152. val splitedField = fildString.split("\\s+", -1)
  153. RegisteredSchemaField(splitedField(0), splitedField(1))
  154. }
  155. }
  156.  
  157. //externalTableSchema '(:key , f1:col1 )'
  158. def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {
  159. val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)
  160. val fieldsArray = fieldsStr.split(",").map(_.trim)
  161. fieldsArray.map(fildString => HBaseSchemaField(fildString,""))
  162. }
  163.  
  164.  
  165.  
  166. // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
  167. lazy val buildScan = {
  168.  
  169. val hbaseConf = HBaseConfiguration.create()
  170. hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
  171. hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns);
  172. hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey);
  173. hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey);
  174.  
  175. val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
  176. hbaseConf,
  177. classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
  178. classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  179. classOf[org.apache.hadoop.hbase.client.Result]
  180. )
  181.  
  182.  
  183. val rs = hbaseRdd.map(tuple => tuple._2).map(result => {
  184. var values = new ArrayBuffer[Any]()
  185. hbaseTableFields.foreach{field=>
  186. values += Resolver.resolve(field,result)
  187. }
  188. Row.fromSeq(values.toSeq)
  189. })
  190. rs
  191. }
  192.  
  193. private case class SchemaType(dataType: DataType, nullable: Boolean)
  194. //
  195. // private def toSqlType(hbaseSchema: Schema): SchemaType = {
  196. // SchemaType(StringType,true)
  197. // }
  198. }

DefaultSource.scala

 
  1. package com.lxw1234.sparksql.hbase
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import org.apache.spark.sql.sources.RelationProvider
  5.  
  6.  
  7. class DefaultSource extends RelationProvider {
  8. def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
  9. HBaseRelation(parameters)(sqlContext)
  10. }
  11. }

package.scala

 
  1. package com.lxw1234.sparksql
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import scala.collection.immutable.HashMap
  5.  
  6.  
  7.  
  8. package object hbase {
  9.  
  10. abstract class SchemaField extends Serializable
  11.  
  12. case class RegisteredSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  13.  
  14. case class HBaseSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable
  15.  
  16. case class Parameter(name: String)
  17.  
  18.  
  19. protected val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")
  20. protected val HBASE_TABLE_NAME = Parameter("hbase_table_name")
  21. protected val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")
  22. protected val ROW_RANGE = Parameter("row_range")
  23. /**
  24. * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.
  25. */
  26. implicit class HBaseContext(sqlContext: SQLContext) {
  27. def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {
  28. var params = new HashMap[String, String]
  29. params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)
  30. params += ( HBASE_TABLE_NAME.name -> hbaseTableName)
  31. params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)
  32. //get star row and end row
  33. params += ( ROW_RANGE.name -> rowRange)
  34. sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));
  35. //sqlContext.baseRelationToSchemaRDD(HBaseRelation(params)(sqlContext))
  36. }
  37. }
  38.  
  39. // implicit class HBaseSchemaRDD(schemaRDD: SchemaRDD) {
  40. // def saveIntoTable(tableName: String): Unit = ???
  41. // }
  42. }
  43.  

 

相关配置和说明

  • 本来在SparkSQL中通过外部数据源建表的语法是:

CREATE TEMPORARY TABLE hbasetable

USING com.lxw1234.sparksql.hbase

OPTIONS (

sparksql_table_schema   ‘(row_key string, c1 string, c2 string, c3 string)’,

hbase_table_name   ‘lxw1234′,

hbase_table_schema ‘(:key , f1:c2 , f2:c2 , f3:c3)’

)

在我的Spark1.4中报错,会使用Hive的语法解析器解析这个DDL语句,因为Hive0.13中没有这种语法,因此报错。

是否是因为Spark1.4包的编译了Hive的原因?

  • 上面源码的编译依赖HBase的相关jar包:

hbase-client-0.96.1.1-cdh5.0.0.jar

hbase-common-0.96.1.1-cdh5.0.0.jar

hbase-protocol-0.96.1.1-cdh5.0.0.jar

hbase-server-0.96.1.1-cdh5.0.0.jar

还有HBase的集群信息:

hbase.zookeeper.quorum

hbase.client.scanner.caching

我之前在配置时候已经将这几个jar包和参数加到Spark集群的CLASSPATH中了,可参考 http://lxw1234.com/archives/2015/07/330.htm

  • 此程序是OopsOutOfMemory基于Spark1.2开发的,我只做了很小的修改。

https://github.com/OopsOutOfMemory/spark-sql-hbase

  • 此程序只做学习和测试使用,并未测试性能

这篇关于SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1083726

相关文章

捷瑞数字业绩波动性明显:关联交易不低,募资必要性遭质疑

《港湾商业观察》施子夫 5月22日,山东捷瑞数字科技股份有限公司(以下简称,捷瑞数字)及保荐机构国新证券披露第三轮问询的回复,继续推进北交所上市进程。 从2023年6月递表开始,监管层已下发三轮审核问询函,关注到捷瑞数字存在同业竞争、关联交易、募资合理性、期后业绩波动等焦点问题。公司的上市之路多少被阴影笼罩。​ 业绩波动遭问询 捷瑞数字成立于2000年,公司是一家以数字孪生驱动的工

JAVA读取MongoDB中的二进制图片并显示在页面上

1:Jsp页面: <td><img src="${ctx}/mongoImg/show"></td> 2:xml配置: <?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001

【服务器运维】MySQL数据存储至数据盘

查看磁盘及分区 [root@MySQL tmp]# fdisk -lDisk /dev/sda: 21.5 GB, 21474836480 bytes255 heads, 63 sectors/track, 2610 cylindersUnits = cylinders of 16065 * 512 = 8225280 bytesSector size (logical/physical)

ROS话题通信流程自定义数据格式

ROS话题通信流程自定义数据格式 需求流程实现步骤定义msg文件编辑配置文件编译 在 ROS 通信协议中,数据载体是一个较为重要组成部分,ROS 中通过 std_msgs 封装了一些原生的数据类型,比如:String、Int32、Int64、Char、Bool、Empty… 但是,这些数据一般只包含一个 data 字段,结构的单一意味着功能上的局限性,当传输一些复杂的数据,比如:

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

数据时代的数字企业

1.写在前面 讨论数据治理在数字企业中的影响和必要性,并介绍数据治理的核心内容和实践方法。作者强调了数据质量、数据安全、数据隐私和数据合规等方面是数据治理的核心内容,并介绍了具体的实践措施和案例分析。企业需要重视这些方面以实现数字化转型和业务增长。 数字化转型行业小伙伴可以加入我的星球,初衷成为各位数字化转型参考库,星球内容每周更新 个人工作经验资料全部放在这里,包含数据治理、数据要

如何在Java中处理JSON数据?

如何在Java中处理JSON数据? 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨在Java中如何处理JSON数据。JSON(JavaScript Object Notation)作为一种轻量级的数据交换格式,在现代应用程序中被广泛使用。Java通过多种库和API提供了处理JSON的能力,我们将深入了解其用法和最佳

两个基因相关性CPTAC蛋白组数据

目录 蛋白数据下载 ①蛋白数据下载 1,TCGA-选择泛癌数据  2,TCGA-TCPA 3,CPTAC(非TCGA) ②蛋白相关性分析 1,数据整理 2,蛋白相关性分析 PCAS在线分析 蛋白数据下载 CPTAC蛋白组学数据库介绍及数据下载分析 – 王进的个人网站 (jingege.wang) ①蛋白数据下载 可以下载泛癌蛋白数据:UCSC Xena (xena

BD错误集锦9——查询hive表格时出错:Wrong FS: hdfs://s233/user/../warehouse expected: hdfs://mycluster

集群环境描述:HDFS集群处于HA模式下,同时启动了YARN\JN\KAFKA\ZK。 现象: FAILED: SemanticException Unable to determine if hdfs://s233/user/hive/warehouse/mydb.db/ext_calllogs_in_hbase is encrypted: java.lang.IllegalArgument