spark将数据写入hbase以及从hbase读取数据

2024-08-27 10:32

本文主要是介绍spark将数据写入hbase以及从hbase读取数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文:http://blog.csdn.net/u013468917/article/details/52822074

本文将介绍

1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset将RDD写入hbase

2、spark从hbase中读取数据并转化为RDD

操作方式为在eclipse本地运行spark连接到远程的hbase。

java版本:1.7.0

scala版本:2.10.4

zookeeper版本:3.4.5(禁用了hbase自带zookeeper,选择自己部署的)

hadoop版本:2.4.1

spark版本:1.6.1

hbase版本:1.2.3

集群:centos6.5_x64

将RDD写入hbase

注意点:

依赖:

将lib目录下的hadoop开头jar包、hbase开头jar包添加至classpath

此外还有lib目录下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少会提示hbase RpcRetryingCaller: Call exception不断尝试重连hbase,不报错)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目录下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能会有相同名称的类,不要导错

连接集群:

spark应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper:

第一种是将hbase-site.xml文件加入classpath

第二种是在HBaseConfiguration实例中设置

如果不设置,默认连接的是localhost:2181会报错:connection refused 

本文使用的是第二种方式。

hbase创建表:

虽然可以在spark应用中创建hbase表,但是不建议这样做,最好在hbase shell中创建表,spark写或读数据

使用saveAsHadoopDataset写入数据

[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.HBaseConfiguration  
  4. import org.apache.hadoop.hbase.client.Put  
  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  6. import org.apache.hadoop.hbase.mapred.TableOutputFormat  
  7. import org.apache.hadoop.hbase.util.Bytes  
  8. import org.apache.hadoop.mapred.JobConf  
  9. import org.apache.spark.SparkConf  
  10. import org.apache.spark.SparkContext  
  11. import org.apache.spark.rdd.RDD.rddToPairRDDFunctions  
  12.   
  13. object TestHBase {  
  14.   
  15.   def main(args: Array[String]): Unit = {  
  16.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  17.     val sc = new SparkContext(sparkConf)  
  18.   
  19.     val conf = HBaseConfiguration.create()  
  20.     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置  
  21.     conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  22.     //设置zookeeper连接端口,默认2181  
  23.     conf.set("hbase.zookeeper.property.clientPort", "2181")  
  24.   
  25.     val tablename = "account"  
  26.       
  27.     //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!  
  28.     val jobConf = new JobConf(conf)  
  29.     jobConf.setOutputFormat(classOf[TableOutputFormat])  
  30.     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
  31.       
  32.     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
  33.   
  34.   
  35.     val rdd = indataRDD.map(_.split(',')).map{arr=>{  
  36.       /*一个Put对象就是一行记录,在构造方法中指定主键  
  37.        * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换  
  38.        * Put.add方法接收三个参数:列族,列名,数据  
  39.        */  
  40.       val put = new Put(Bytes.toBytes(arr(0).toInt))  
  41.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
  42.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
  43.       //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset  
  44.       (new ImmutableBytesWritable, put)   
  45.     }}  
  46.       
  47.     rdd.saveAsHadoopDataset(jobConf)  
  48.       
  49.     sc.stop()  
  50.   }  
  51.   
  52. }  

使用saveAsNewAPIHadoopDataset写入数据


[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.HBaseConfiguration  
  4. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
  5. import org.apache.spark._  
  6. import org.apache.hadoop.mapreduce.Job  
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  8. import org.apache.hadoop.hbase.client.Result  
  9. import org.apache.hadoop.hbase.client.Put  
  10. import org.apache.hadoop.hbase.util.Bytes  
  11.   
  12. object TestHBase3 {  
  13.   
  14.   def main(args: Array[String]): Unit = {  
  15.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  16.     val sc = new SparkContext(sparkConf)  
  17.       
  18.     val tablename = "account"  
  19.       
  20.     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  21.     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")  
  22.     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
  23.       
  24.     val job = new Job(sc.hadoopConfiguration)  
  25.     job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
  26.     job.setOutputValueClass(classOf[Result])    
  27.     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    
  28.   
  29.     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
  30.     val rdd = indataRDD.map(_.split(',')).map{arr=>{  
  31.       val put = new Put(Bytes.toBytes(arr(0)))  
  32.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
  33.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
  34.       (new ImmutableBytesWritable, put)   
  35.     }}  
  36.       
  37.     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
  38.   }  
  39.   
  40. }  


从hbase读取数据转化成RDD

本例基于官方提供的例子

[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}  
  4. import org.apache.hadoop.hbase.client.HBaseAdmin  
  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
  6. import org.apache.spark._  
  7. import org.apache.hadoop.hbase.client.HTable  
  8. import org.apache.hadoop.hbase.client.Put  
  9. import org.apache.hadoop.hbase.util.Bytes  
  10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  11. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
  12. import org.apache.hadoop.mapred.JobConf  
  13. import org.apache.hadoop.io._  
  14.   
  15. object TestHBase2 {  
  16.   
  17.   def main(args: Array[String]): Unit = {  
  18.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  19.     val sc = new SparkContext(sparkConf)  
  20.       
  21.     val tablename = "account"  
  22.     val conf = HBaseConfiguration.create()  
  23.     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置  
  24.     conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  25.     //设置zookeeper连接端口,默认2181  
  26.     conf.set("hbase.zookeeper.property.clientPort", "2181")  
  27.     conf.set(TableInputFormat.INPUT_TABLE, tablename)  
  28.   
  29.     // 如果表不存在则创建表  
  30.     val admin = new HBaseAdmin(conf)  
  31.     if (!admin.isTableAvailable(tablename)) {  
  32.       val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))  
  33.       admin.createTable(tableDesc)  
  34.     }  
  35.   
  36.     //读取数据并转化成rdd  
  37.     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],  
  38.       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
  39.       classOf[org.apache.hadoop.hbase.client.Result])  
  40.   
  41.     val count = hBaseRDD.count()  
  42.     println(count)  
  43.     hBaseRDD.foreach{case (_,result) =>{  
  44.       //获取行键  
  45.       val key = Bytes.toString(result.getRow)  
  46.       //通过列族和列名获取列  
  47.       val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))  
  48.       val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))  
  49.       println("Row key:"+key+" Name:"+name+" Age:"+age)  
  50.     }}  
  51.   
  52.     sc.stop()  
  53.     admin.close()  
  54.   }  
  55. }  


这篇关于spark将数据写入hbase以及从hbase读取数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1111416

相关文章

Java中注解与元数据示例详解

《Java中注解与元数据示例详解》Java注解和元数据是编程中重要的概念,用于描述程序元素的属性和用途,:本文主要介绍Java中注解与元数据的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参... 目录一、引言二、元数据的概念2.1 定义2.2 作用三、Java 注解的基础3.1 注解的定义3.2 内

将sqlserver数据迁移到mysql的详细步骤记录

《将sqlserver数据迁移到mysql的详细步骤记录》:本文主要介绍将SQLServer数据迁移到MySQL的步骤,包括导出数据、转换数据格式和导入数据,通过示例和工具说明,帮助大家顺利完成... 目录前言一、导出SQL Server 数据二、转换数据格式为mysql兼容格式三、导入数据到MySQL数据

C++中使用vector存储并遍历数据的基本步骤

《C++中使用vector存储并遍历数据的基本步骤》C++标准模板库(STL)提供了多种容器类型,包括顺序容器、关联容器、无序关联容器和容器适配器,每种容器都有其特定的用途和特性,:本文主要介绍C... 目录(1)容器及简要描述‌php顺序容器‌‌关联容器‌‌无序关联容器‌(基于哈希表):‌容器适配器‌:(

C#提取PDF表单数据的实现流程

《C#提取PDF表单数据的实现流程》PDF表单是一种常见的数据收集工具,广泛应用于调查问卷、业务合同等场景,凭借出色的跨平台兼容性和标准化特点,PDF表单在各行各业中得到了广泛应用,本文将探讨如何使用... 目录引言使用工具C# 提取多个PDF表单域的数据C# 提取特定PDF表单域的数据引言PDF表单是一

一文详解Python中数据清洗与处理的常用方法

《一文详解Python中数据清洗与处理的常用方法》在数据处理与分析过程中,缺失值、重复值、异常值等问题是常见的挑战,本文总结了多种数据清洗与处理方法,文中的示例代码简洁易懂,有需要的小伙伴可以参考下... 目录缺失值处理重复值处理异常值处理数据类型转换文本清洗数据分组统计数据分箱数据标准化在数据处理与分析过

大数据小内存排序问题如何巧妙解决

《大数据小内存排序问题如何巧妙解决》文章介绍了大数据小内存排序的三种方法:数据库排序、分治法和位图法,数据库排序简单但速度慢,对设备要求高;分治法高效但实现复杂;位图法可读性差,但存储空间受限... 目录三种方法:方法概要数据库排序(http://www.chinasem.cn对数据库设备要求较高)分治法(常

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动