Flink写出数据到Hbase的Sink

2024-05-10 13:32
文章标签 数据 flink sink hbase 写出

本文主要是介绍Flink写出数据到Hbase的Sink,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 一、MyHbaseSink
        • 1、继承RichSinkFunction<输入的数据类型>类
        • 2、实现open方法,创建连接对象
        • 3、实现invoke方法,批次写入数据到Hbase
        • 4、实现close方法,关闭连接
    • 二、HBaseUtil工具类

一、MyHbaseSink

1、继承RichSinkFunction<输入的数据类型>类
public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {private transient Integer maxSize = 1000;private transient Long delayTime = 5000L;public MyHbaseSink() {}public MyHbaseSink(Integer maxSize, Long delayTime) {this.maxSize = maxSize;this.delayTime = delayTime;}private transient Connection connection;private transient Long lastInvokeTime;private transient List<Put> puts = new ArrayList<>(maxSize);
2、实现open方法,创建连接对象
    // 创建连接@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 获取全局配置文件,并转为ParameterToolParameterTool params =(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();//创建一个Hbase的连接connection = HBaseUtil.getConnection(params.getRequired("hbase.zookeeper.quorum"),params.getInt("hbase.zookeeper.property.clientPort", 2181));// 获取系统当前时间lastInvokeTime = System.currentTimeMillis();}
3、实现invoke方法,批次写入数据到Hbase
    @Overridepublic void invoke(Tuple2<String, Double> value, Context context) throws Exception {String rk = value.f0;//创建put对象,并赋rk值Put put = new Put(rk.getBytes());// 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());puts.add(put);// 添加put对象到list集合//使用ProcessingTimelong currentTime = System.currentTimeMillis();//开始批次提交数据if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {//获取一个Hbase表Table table = connection.getTable(TableName.valueOf("database:table"));table.put(puts);//批次提交puts.clear();lastInvokeTime = currentTime;table.close();}}
4、实现close方法,关闭连接
    @Overridepublic void close() throws Exception {connection.close();}

二、HBaseUtil工具类

  • Hbase的工具类,用来创建Hbase的Connection
public class HBaseUtil {/*** @param zkQuorum zookeeper地址,多个要用逗号分隔* @param port     zookeeper端口号* @return connection*/public static Connection getConnection(String zkQuorum, int port) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", zkQuorum);conf.set("hbase.zookeeper.property.clientPort", port + "");Connection connection = ConnectionFactory.createConnection(conf);return connection;}
}

这篇关于Flink写出数据到Hbase的Sink的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/976574

相关文章

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

Python实现将实体类列表数据导出到Excel文件

《Python实现将实体类列表数据导出到Excel文件》在数据处理和报告生成中,将实体类的列表数据导出到Excel文件是一项常见任务,Python提供了多种库来实现这一目标,下面就来跟随小编一起学习一... 目录一、环境准备二、定义实体类三、创建实体类列表四、将实体类列表转换为DataFrame五、导出Da

Python实现数据清洗的18种方法

《Python实现数据清洗的18种方法》本文主要介绍了Python实现数据清洗的18种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录1. 去除字符串两边空格2. 转换数据类型3. 大小写转换4. 移除列表中的重复元素5. 快速统

Python数据处理之导入导出Excel数据方式

《Python数据处理之导入导出Excel数据方式》Python是Excel数据处理的绝佳工具,通过Pandas和Openpyxl等库可以实现数据的导入、导出和自动化处理,从基础的数据读取和清洗到复杂... 目录python导入导出Excel数据开启数据之旅:为什么Python是Excel数据处理的最佳拍档

在Pandas中进行数据重命名的方法示例

《在Pandas中进行数据重命名的方法示例》Pandas作为Python中最流行的数据处理库,提供了强大的数据操作功能,其中数据重命名是常见且基础的操作之一,本文将通过简洁明了的讲解和丰富的代码示例,... 目录一、引言二、Pandas rename方法简介三、列名重命名3.1 使用字典进行列名重命名3.编

Python使用Pandas库将Excel数据叠加生成新DataFrame的操作指南

《Python使用Pandas库将Excel数据叠加生成新DataFrame的操作指南》在日常数据处理工作中,我们经常需要将不同Excel文档中的数据整合到一个新的DataFrame中,以便进行进一步... 目录一、准备工作二、读取Excel文件三、数据叠加四、处理重复数据(可选)五、保存新DataFram

使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)

《使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)》在现代软件开发中,处理JSON数据是一项非常常见的任务,无论是从API接口获取数据,还是将数据存储为JSON格式,解析... 目录1. 背景介绍1.1 jsON简介1.2 实际案例2. 准备工作2.1 环境搭建2.1.1 添加