《从0到1学习Flink》—— 如何自定义 Data Sink ?

2024-05-02 08:32
文章标签 学习 自定义 data flink sink

本文主要是介绍《从0到1学习Flink》—— 如何自定义 Data Sink ?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

前篇文章 《从0到1学习Flink》—— Data Sink 介绍 介绍了 Flink Data Sink,也介绍了 Flink 自带的 Sink,那么如何自定义自己的 Sink 呢?这篇文章将写一个 demo 教大家将从 Kafka Source 的数据 Sink 到 MySQL 中去。

准备工作

我们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你需要安装好了 FLink 和 Kafka 。

运行启动 Flink、Zookepeer、Kafka,

好了,都启动了!

数据库建表

DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT,`name` varchar(25) COLLATE utf8_bin DEFAULT NULL,`password` varchar(25) COLLATE utf8_bin DEFAULT NULL,`age` int(10) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

实体类

Student.java

package com.zhisheng.flink.model;/*** Desc:* weixin: zhisheng_tian* blog: http://www.54tianzhisheng.cn/*/
public class Student {public int id;public String name;public String password;public int age;public Student() {}public Student(int id, String name, String password, int age) {this.id = id;this.name = name;this.password = password;this.age = age;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", password='" + password + '\'' +", age=" + age +'}';}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}
}

工具类

工具类往 kafka topic student 发送数据

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Metric;
import com.zhisheng.flink.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** 往kafka中写数据* 可以使用这个main函数进行测试一下* weixin: zhisheng_tian* blog: http://www.54tianzhisheng.cn/*/
public class KafkaUtils2 {public static final String broker_list = "localhost:9092";public static final String topic = "student";  //kafka topic 需要和 flink 程序用同一个 topicpublic static void writeToKafka() throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", broker_list);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer<String, String>(props);for (int i = 1; i <= 100; i++) {Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));producer.send(record);System.out.println("发送数据: " + JSON.toJSONString(student));}producer.flush();}public static void main(String[] args) throws InterruptedException {writeToKafka();}
}

SinkToMySQL

该类就是 Sink Function,继承了 RichSinkFunction ,然后重写了里面的方法。在 invoke 方法中将数据插入到 MySQL 中。

package com.zhisheng.flink.sink;import com.zhisheng.flink.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** Desc:* weixin: zhisheng_tian* blog: http://www.54tianzhisheng.cn/*/
public class SinkToMySQL extends RichSinkFunction<Student> {PreparedStatement ps;private Connection connection;/*** open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";ps = this.connection.prepareStatement(sql);}@Overridepublic void close() throws Exception {super.close();//关闭连接和释放资源if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** 每条数据的插入都要调用一次 invoke() 方法** @param value* @param context* @throws Exception*/@Overridepublic void invoke(Student value, Context context) throws Exception {//组装数据,执行插入操作ps.setInt(1, value.getId());ps.setString(2, value.getName());ps.setString(3, value.getPassword());ps.setInt(4, value.getAge());ps.executeUpdate();}private static Connection getConnection() {Connection con = null;try {Class.forName("com.mysql.jdbc.Driver");con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");} catch (Exception e) {System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());}return con;}
}

Flink 程序

这里的 source 是从 kafka 读取数据的,然后 Flink 从 Kafka 读取到数据(JSON)后用阿里 fastjson 来解析成 student 对象,然后在 addSink 中使用我们创建的 SinkToMySQL,这样就可以把数据存储到 MySQL 了。

package com.zhisheng.flink;import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Student;
import com.zhisheng.flink.sink.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;import java.util.Properties;/*** Desc:* weixin: zhisheng_tian* blog: http://www.54tianzhisheng.cn/*/
public class Main3 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("zookeeper.connect", "localhost:2181");props.put("group.id", "metric-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>("student",   //这个 kafka topic 需要和上面的工具类的 topic 一致new SimpleStringSchema(),props)).setParallelism(1).map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象student.addSink(new SinkToMySQL()); //数据 sink 到 mysqlenv.execute("Flink add sink");}
}

结果

运行 Flink 程序,然后再运行 KafkaUtils2.java 工具类,这样就可以了。

如果数据插入成功了,那么我们查看下我们的数据库:

数据库中已经插入了 100 条我们从 Kafka 发送的数据了。证明我们的 SinkToMySQL 起作用了。是不是很简单?

项目结构

怕大家不知道我的项目结构,这里发个截图看下:

最后

本文主要利用一个 demo,告诉大家如何自定义 Sink Function,将从 Kafka 的数据 Sink 到 MySQL 中,如果你项目中有其他的数据来源,你也可以换成对应的 Source,也有可能你的 Sink 是到其他的地方或者其他不同的方式,那么依旧是这个套路:继承 RichSinkFunction 抽象类,重写 invoke 方法。

关注我

转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/10/31/flink-create-sink/

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

这篇关于《从0到1学习Flink》—— 如何自定义 Data Sink ?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/953826

相关文章

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

如何自定义Nginx JSON日志格式配置

《如何自定义NginxJSON日志格式配置》Nginx作为最流行的Web服务器之一,其灵活的日志配置能力允许我们根据需求定制日志格式,本文将详细介绍如何配置Nginx以JSON格式记录访问日志,这种... 目录前言为什么选择jsON格式日志?配置步骤详解1. 安装Nginx服务2. 自定义JSON日志格式各

Android自定义Scrollbar的两种实现方式

《Android自定义Scrollbar的两种实现方式》本文介绍两种实现自定义滚动条的方法,分别通过ItemDecoration方案和独立View方案实现滚动条定制化,文章通过代码示例讲解的非常详细,... 目录方案一:ItemDecoration实现(推荐用于RecyclerView)实现原理完整代码实现

基于Spring实现自定义错误信息返回详解

《基于Spring实现自定义错误信息返回详解》这篇文章主要为大家详细介绍了如何基于Spring实现自定义错误信息返回效果,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录背景目标实现产出背景Spring 提供了 @RestConChina编程trollerAdvice 用来实现 HTT

SpringSecurity 认证、注销、权限控制功能(注销、记住密码、自定义登入页)

《SpringSecurity认证、注销、权限控制功能(注销、记住密码、自定义登入页)》SpringSecurity是一个强大的Java框架,用于保护应用程序的安全性,它提供了一套全面的安全解决方案... 目录简介认识Spring Security“认证”(Authentication)“授权” (Auth

Java进阶学习之如何开启远程调式

《Java进阶学习之如何开启远程调式》Java开发中的远程调试是一项至关重要的技能,特别是在处理生产环境的问题或者协作开发时,:本文主要介绍Java进阶学习之如何开启远程调式的相关资料,需要的朋友... 目录概述Java远程调试的开启与底层原理开启Java远程调试底层原理JVM参数总结&nbsMbKKXJx

HTML5 data-*自定义数据属性的示例代码

《HTML5data-*自定义数据属性的示例代码》HTML5的自定义数据属性(data-*)提供了一种标准化的方法在HTML元素上存储额外信息,可以通过JavaScript访问、修改和在CSS中使用... 目录引言基本概念使用自定义数据属性1. 在 html 中定义2. 通过 JavaScript 访问3.

SpringBoot自定义注解如何解决公共字段填充问题

《SpringBoot自定义注解如何解决公共字段填充问题》本文介绍了在系统开发中,如何使用AOP切面编程实现公共字段自动填充的功能,从而简化代码,通过自定义注解和切面类,可以统一处理创建时间和修改时间... 目录1.1 问题分析1.2 实现思路1.3 代码开发1.3.1 步骤一1.3.2 步骤二1.3.3

dubbo3 filter(过滤器)如何自定义过滤器

《dubbo3filter(过滤器)如何自定义过滤器》dubbo3filter(过滤器)类似于javaweb中的filter和springmvc中的intercaptor,用于在请求发送前或到达前进... 目录dubbo3 filter(过滤器)简介dubbo 过滤器运行时机自定义 filter第一种 @A

Java深度学习库DJL实现Python的NumPy方式

《Java深度学习库DJL实现Python的NumPy方式》本文介绍了DJL库的背景和基本功能,包括NDArray的创建、数学运算、数据获取和设置等,同时,还展示了如何使用NDArray进行数据预处理... 目录1 NDArray 的背景介绍1.1 架构2 JavaDJL使用2.1 安装DJL2.2 基本操