本文主要是介绍Flink写出数据到Hbase的Sink,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 一、MyHbaseSink
- 1、继承RichSinkFunction<输入的数据类型>类
- 2、实现open方法,创建连接对象
- 3、实现invoke方法,批次写入数据到Hbase
- 4、实现close方法,关闭连接
- 二、HBaseUtil工具类
一、MyHbaseSink
1、继承RichSinkFunction<输入的数据类型>类
public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {private transient Integer maxSize = 1000;private transient Long delayTime = 5000L;public MyHbaseSink() {}public MyHbaseSink(Integer maxSize, Long delayTime) {this.maxSize = maxSize;this.delayTime = delayTime;}private transient Connection connection;private transient Long lastInvokeTime;private transient List<Put> puts = new ArrayList<>(maxSize);
2、实现open方法,创建连接对象
// 创建连接@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 获取全局配置文件,并转为ParameterToolParameterTool params =(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();//创建一个Hbase的连接connection = HBaseUtil.getConnection(params.getRequired("hbase.zookeeper.quorum"),params.getInt("hbase.zookeeper.property.clientPort", 2181));// 获取系统当前时间lastInvokeTime = System.currentTimeMillis();}
3、实现invoke方法,批次写入数据到Hbase
@Overridepublic void invoke(Tuple2<String, Double> value, Context context) throws Exception {String rk = value.f0;//创建put对象,并赋rk值Put put = new Put(rk.getBytes());// 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());puts.add(put);// 添加put对象到list集合//使用ProcessingTimelong currentTime = System.currentTimeMillis();//开始批次提交数据if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {//获取一个Hbase表Table table = connection.getTable(TableName.valueOf("database:table"));table.put(puts);//批次提交puts.clear();lastInvokeTime = currentTime;table.close();}}
4、实现close方法,关闭连接
@Overridepublic void close() throws Exception {connection.close();}
二、HBaseUtil工具类
- Hbase的工具类,用来创建Hbase的Connection
public class HBaseUtil {/*** @param zkQuorum zookeeper地址,多个要用逗号分隔* @param port zookeeper端口号* @return connection*/public static Connection getConnection(String zkQuorum, int port) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", zkQuorum);conf.set("hbase.zookeeper.property.clientPort", port + "");Connection connection = ConnectionFactory.createConnection(conf);return connection;}
}
这篇关于Flink写出数据到Hbase的Sink的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!