大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka

2024-09-04 11:44

本文主要是介绍大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Sink 的基本概念等内容
  • Sink的相关信息 配置与使用
  • Sink案例写入Redis

在这里插入图片描述

JDBC Sink

在 Apache Flink 中,通过 JDBC Sink,可以将处理后的数据写入到 MySQL 数据库中。这对于将实时处理的数据持久化或与其他系统进行集成非常有用。

Flink JDBC Sink 简介

Flink 提供了 JdbcSink,它是基于 JDBC 协议的 Sink,可以将数据写入各种关系型数据库,包括 MySQL。在使用 JDBC Sink 时,需要提供数据库连接信息和 SQL 语句,通过这些信息,Flink 将数据流中的记录插入或更新到 MySQL 表中。

Flink 到 MySQL 的基本步骤

将数据流写入 MySQL 的步骤主要包括以下几点:

  • 依赖库配置:确保在项目中引入了 Flink 和 MySQL 相关的依赖库,通常需要配置 Maven 或 Gradle。
  • 定义数据源和数据流:创建并处理数据流。
  • 配置 JDBC Sink:提供数据库的连接信息和插入 SQL 语句。
  • 启动任务:将数据流写入 MySQL。

优化建议

在实际项目中,向 MySQL 插入大量数据时,应考虑以下优化策略:

  • 批量插入:通过 JdbcExecutionOptions 配置批量插入,可以大幅提升写入性能。
  • 连接池:对于高并发的写入操作,建议使用连接池来减少数据库连接开销。
  • 索引优化:为插入的表配置合适的索引,可以提高查询性能,但在大量写入时,索引可能会降低- 插入速度,因此需要权衡。
  • 数据分片:对于非常大规模的数据,可以考虑将数据分片并行写入不同的 MySQL 实例或分区表中。

案例:流数据下沉到MySQL

添加依赖

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version>
</dependency>

编写代码

一个Person的类,对应MySQL中的一张表的字段。
模拟几条数据流,写入到 MySQL中。

package icu.wzk;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkSqlTest {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Person> data = env.getJavaEnv().fromElements(new Person("wzk", 18, 1),new Person("icu", 20, 1),new Person("wzkicu", 13, 2));data.addSink(new MySqlSinkFunction());env.execute();}public static class MySqlSinkFunction extends RichSinkFunction<Person> {private PreparedStatement preparedStatement = null;private Connection connection = null;@Overridepublic void open(Configuration parameters) throws Exception {String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";String username = "hive";String password = "hive@wzk.icu";connection = DriverManager.getConnection(url, username, password);String sql = "INSERT INTI PERSON(name, age, sex) VALUES(?, ?, ?)";preparedStatement = connection.prepareStatement(sql);}@Overridepublic void invoke(Person value, Context context) throws Exception {preparedStatement.setString(1, value.getName());preparedStatement.setInt(2, value.getAge());preparedStatement.setInt(3, value.getSex());preparedStatement.executeUpdate();}@Overridepublic void close() throws Exception {if (null != connection) {connection.close();}if (null != preparedStatement) {preparedStatement.close();}}}public static class Person {private String name;private Integer age;private Integer sex;public Person() {}public Person(String name, Integer age, Integer sex) {this.name = name;this.age = age;this.sex = sex;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public Integer getSex() {return sex;}public void setSex(Integer sex) {this.sex = sex;}}
}

数据库配置

我们新建一张表出来,person表,里边有我们需要的字段。
在这里插入图片描述

运行代码

我们运行代码,等待运行结束。
在这里插入图片描述

查看结果

查看数据库中的数据,我们可以看到刚才模拟的数据已经成功写入了。
在这里插入图片描述

案例:写入到Kafka

编写代码

package icu.wzk;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;public class SinkKafkaTest {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0);String brokerList = "h121.wzk.icu:9092";String topic = "flink_test";FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());data.addSink(producer);env.execute("SinkKafkaTest");}}

运行代码

启动一个 nc

nc -lk 9999

我们通过回车的方式,可以发送数据。
在这里插入图片描述
Java 程序中等待
在这里插入图片描述

查看结果

我们登录到服务器查看信息

./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test --from-beginning

可以看到刚才的数据已经写入了:
在这里插入图片描述

这篇关于大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1135929

相关文章

mysql中的group by高级用法

《mysql中的groupby高级用法》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,下面给大家介绍mysql中的groupby用法... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类

Mysql用户授权(GRANT)语法及示例解读

《Mysql用户授权(GRANT)语法及示例解读》:本文主要介绍Mysql用户授权(GRANT)语法及示例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql用户授权(GRANT)语法授予用户权限语法GRANT语句中的<权限类型>的使用WITH GRANT

Mysql如何解决死锁问题

《Mysql如何解决死锁问题》:本文主要介绍Mysql如何解决死锁问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录【一】mysql中锁分类和加锁情况【1】按锁的粒度分类全局锁表级锁行级锁【2】按锁的模式分类【二】加锁方式的影响因素【三】Mysql的死锁情况【1

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

SQL BETWEEN 的常见用法小结

《SQLBETWEEN的常见用法小结》BETWEEN操作符是SQL中非常有用的工具,它允许你快速选取某个范围内的值,本文给大家介绍SQLBETWEEN的常见用法,感兴趣的朋友一起看看吧... 在SQL中,BETWEEN是一个操作符,用于选取介于两个值之间的数据。它包含这两个边界值。BETWEEN操作符常用

MySQL索引的优化之LIKE模糊查询功能实现

《MySQL索引的优化之LIKE模糊查询功能实现》:本文主要介绍MySQL索引的优化之LIKE模糊查询功能实现,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录一、前缀匹配优化二、后缀匹配优化三、中间匹配优化四、覆盖索引优化五、减少查询范围六、避免通配符开头七、使用外部搜索引擎八、分

MySql match against工具详细用法

《MySqlmatchagainst工具详细用法》在MySQL中,MATCH……AGAINST是全文索引(Full-Textindex)的查询语法,它允许你对文本进行高效的全文搜素,支持自然语言搜... 目录一、全文索引的基本概念二、创建全文索引三、自然语言搜索四、布尔搜索五、相关性排序六、全文索引的限制七

数据库面试必备之MySQL中的乐观锁与悲观锁

《数据库面试必备之MySQL中的乐观锁与悲观锁》:本文主要介绍数据库面试必备之MySQL中乐观锁与悲观锁的相关资料,乐观锁适用于读多写少的场景,通过版本号检查避免冲突,而悲观锁适用于写多读少且对数... 目录一、引言二、乐观锁(一)原理(二)应用场景(三)示例代码三、悲观锁(一)原理(二)应用场景(三)示例

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient