大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka

2024-09-04 11:44

本文主要是介绍大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Sink 的基本概念等内容
  • Sink的相关信息 配置与使用
  • Sink案例写入Redis

在这里插入图片描述

JDBC Sink

在 Apache Flink 中,通过 JDBC Sink,可以将处理后的数据写入到 MySQL 数据库中。这对于将实时处理的数据持久化或与其他系统进行集成非常有用。

Flink JDBC Sink 简介

Flink 提供了 JdbcSink,它是基于 JDBC 协议的 Sink,可以将数据写入各种关系型数据库,包括 MySQL。在使用 JDBC Sink 时,需要提供数据库连接信息和 SQL 语句,通过这些信息,Flink 将数据流中的记录插入或更新到 MySQL 表中。

Flink 到 MySQL 的基本步骤

将数据流写入 MySQL 的步骤主要包括以下几点:

  • 依赖库配置:确保在项目中引入了 Flink 和 MySQL 相关的依赖库,通常需要配置 Maven 或 Gradle。
  • 定义数据源和数据流:创建并处理数据流。
  • 配置 JDBC Sink:提供数据库的连接信息和插入 SQL 语句。
  • 启动任务:将数据流写入 MySQL。

优化建议

在实际项目中,向 MySQL 插入大量数据时,应考虑以下优化策略:

  • 批量插入:通过 JdbcExecutionOptions 配置批量插入,可以大幅提升写入性能。
  • 连接池:对于高并发的写入操作,建议使用连接池来减少数据库连接开销。
  • 索引优化:为插入的表配置合适的索引,可以提高查询性能,但在大量写入时,索引可能会降低- 插入速度,因此需要权衡。
  • 数据分片:对于非常大规模的数据,可以考虑将数据分片并行写入不同的 MySQL 实例或分区表中。

案例:流数据下沉到MySQL

添加依赖

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version>
</dependency>

编写代码

一个Person的类,对应MySQL中的一张表的字段。
模拟几条数据流,写入到 MySQL中。

package icu.wzk;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkSqlTest {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Person> data = env.getJavaEnv().fromElements(new Person("wzk", 18, 1),new Person("icu", 20, 1),new Person("wzkicu", 13, 2));data.addSink(new MySqlSinkFunction());env.execute();}public static class MySqlSinkFunction extends RichSinkFunction<Person> {private PreparedStatement preparedStatement = null;private Connection connection = null;@Overridepublic void open(Configuration parameters) throws Exception {String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";String username = "hive";String password = "hive@wzk.icu";connection = DriverManager.getConnection(url, username, password);String sql = "INSERT INTI PERSON(name, age, sex) VALUES(?, ?, ?)";preparedStatement = connection.prepareStatement(sql);}@Overridepublic void invoke(Person value, Context context) throws Exception {preparedStatement.setString(1, value.getName());preparedStatement.setInt(2, value.getAge());preparedStatement.setInt(3, value.getSex());preparedStatement.executeUpdate();}@Overridepublic void close() throws Exception {if (null != connection) {connection.close();}if (null != preparedStatement) {preparedStatement.close();}}}public static class Person {private String name;private Integer age;private Integer sex;public Person() {}public Person(String name, Integer age, Integer sex) {this.name = name;this.age = age;this.sex = sex;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public Integer getSex() {return sex;}public void setSex(Integer sex) {this.sex = sex;}}
}

数据库配置

我们新建一张表出来,person表,里边有我们需要的字段。
在这里插入图片描述

运行代码

我们运行代码,等待运行结束。
在这里插入图片描述

查看结果

查看数据库中的数据,我们可以看到刚才模拟的数据已经成功写入了。
在这里插入图片描述

案例:写入到Kafka

编写代码

package icu.wzk;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;public class SinkKafkaTest {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0);String brokerList = "h121.wzk.icu:9092";String topic = "flink_test";FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());data.addSink(producer);env.execute("SinkKafkaTest");}}

运行代码

启动一个 nc

nc -lk 9999

我们通过回车的方式,可以发送数据。
在这里插入图片描述
Java 程序中等待
在这里插入图片描述

查看结果

我们登录到服务器查看信息

./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test --from-beginning

可以看到刚才的数据已经写入了:
在这里插入图片描述

这篇关于大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1135929

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置