Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql

2023-11-24 09:10

本文主要是介绍Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

在这里插入图片描述

今天为大家带来Flink的一个综合应用案例:Flink数据写入Kafka+从Kafka存入Mysql
在这里插入图片描述
第一部分:写数据到kafka中

 public static void writeToKafka() throws Exception{Properties props = new Properties();props.put("bootstrap.servers", BROKER_LIST);props.put("key.serializer", CONST_SERIALIZER);props.put("value.serializer", CONST_SERIALIZER);KafkaProducer<String, String> producer = new KafkaProducer<>(props);//构建User对象,在name为data后边加个随机数int randomInt = RandomUtils.nextInt(1, 100000);User user = new User();user.setName("data" + randomInt);user.setId(randomInt);//转换成JSONString userJson = JSON.toJSONString(user);//包装成kafka发送的记录ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER, partition,null, userJson);//发送到缓存producer.send(record);System.out.println("向kafka发送数据:" + userJson);//立即发送producer.flush();}

重点:

//发送到缓存producer.send(record);

为了增强代码的Robust,我们将常量单独拎出来:

   //本地的kafka机器列表public static final String BROKER_LIST = "192.168.88.161:9092";//kafka的topicpublic static final String TOPIC_USER = "USER";//kafka的partition分区public static final Integer partition = 0;//序列化的方式public static final String CONST_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";//反序列化public static final String CONST_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";

main方法如下:

public static void main(String[] args) {while(true) {try {//每三秒写一条数据TimeUnit.SECONDS.sleep(3);writeToKafka();} catch (Exception e) {e.printStackTrace();}}}

第二部分:从kafka获取数据

KafkaRickSourceFunction.java

import com.hy.flinktest.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;@Slf4j
public class KafkaRickSourceFunction extends RichSourceFunction<String>{//kafkaprivate static Properties prop = new Properties();private boolean running = true;//作静态化处理,增强robustprivate static Integer partition = WritedatatoKafka.partition;static {prop.put("bootstrap.servers",WritedatatoKafka.BROKER_LIST);prop.put("zookeeper.connect","192.168.88.161:2181");prop.put("group.id",WritedatatoKafka.TOPIC_USER);prop.put("key.deserializer",WritedatatoKafka.CONST_DESERIALIZER);prop.put("value.deserializer",WritedatatoKafka.CONST_DESERIALIZER);prop.put("auto.offset.reset","latest");prop.put("max.poll.records", "500");prop.put("auto.commit.interval.ms", "1000");}@Overridepublic void run(SourceContext sourceContext) throws Exception {//创建一个消费者客户端实例KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(prop);//只消费TOPIC_USER 分区TopicPartition topicPartition = new TopicPartition(WritedatatoKafka.TOPIC_USER,partition);long offset =0; //这个初始值应该从zk或其他地方获取offset = placeOffsetToBestPosition(kafkaConsumer, offset, topicPartition);while (running){ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);if(records.isEmpty()){continue;}for (ConsumerRecord<String, String> record : records) {//record.offset();//record.key()String value = record.value();sourceContext.collect(value);}}}

然后 返回最合适的offset
在这里插入图片描述

    /*** 将offset定位到最合适的位置,并返回最合适的offset。* @param kafkaConsumer consumer* @param offset offset* @param topicPartition partition* @return the best offset*/private long placeOffsetToBestPosition(KafkaConsumer<String, String> kafkaConsumer,long offset, TopicPartition topicPartition) {List<TopicPartition> partitions = Collections.singletonList(topicPartition);kafkaConsumer.assign(partitions);long bestOffset = offset;if (offset == 0) {log.info("由于offset为0,重新定位offset到kafka起始位置.");kafkaConsumer.seekToBeginning(partitions);} else if (offset > 0) {kafkaConsumer.seekToBeginning(partitions);long startPosition = kafkaConsumer.position(topicPartition);kafkaConsumer.seekToEnd(partitions);long endPosition = kafkaConsumer.position(topicPartition);if (offset < startPosition) {log.info("由于当前offset({})比kafka的最小offset({})还要小,则定位到kafka的最小offset({})处。",offset, startPosition, startPosition);kafkaConsumer.seekToBeginning(partitions);bestOffset = startPosition;} else if (offset > endPosition) {log.info("由于当前offset({})比kafka的最大offset({})还要大,则定位到kafka的最大offset({})处。",offset, endPosition, endPosition);kafkaConsumer.seekToEnd(partitions);bestOffset = endPosition;} else {kafkaConsumer.seek(topicPartition, offset);}}return bestOffset;}@Overridepublic void cancel() {running = false;}}

第三部分
主类:从kafka读取数据写入mysql

    //1.构建流执行环境 并添加数据源
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.addSource(new KafkaRickSourceFunction());
    //2.从kafka里读取数据,转换成User对象
 DataStream<User> dataStream = dataStreamSource.map(lines -> JSONObject.parseObject(lines, User.class));
//3.收集5秒钟的总数
dataStream.timeWindowAll(Time.seconds(5L)).apply(new AllWindowFunction<User, List<User>, TimeWindow>() {@Overridepublic void apply(TimeWindow timeWindow, Iterable<User> iterable, Collector<List<User>> out) throws Exception {List<User> users = Lists.newArrayList(iterable);if(users.size() > 0) {System.out.println("5秒内总共收到的条数:" + users.size());out.collect(users);}}})//sink 到数据库.addSink(new MysqlRichSinkFunction());//打印到控制台//.print();

第四部分:
写入到目标数据库sink
MysqlRichSinkFunction.java

@Slf4j
public class MysqlRichSinkFunction extends RichSinkFunction<List<User>> {private Connection connection = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {// super.open(parameters);log.info("获取数据库连接");connection = DbUtil.getConnection();String sql = "insert into user1(id,name) values (?,?)";ps = connection.prepareStatement(sql);}public void invoke(List<User> users, Context ctx) throws Exception {//获取ReadMysqlResoure发送过来的结果for(User user : users) {ps.setLong(1, user.getId());ps.setString(2, user.getName());ps.addBatch();}//一次性写入int[] count = ps.executeBatch();log.info("成功写入Mysql数量:" + count.length);}@Overridepublic void close() throws Exception {//关闭并释放资源if(connection != null) {connection.close();}if(ps != null) {ps.close();}}}

总结

以上便是Flink数据写入Kafka+从Kafka存入Mysql
如果有帮助,给manor一键三连吧~~

在这里插入图片描述

这篇关于Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/421645

相关文章

MySQL查询JSON数组字段包含特定字符串的方法

《MySQL查询JSON数组字段包含特定字符串的方法》在MySQL数据库中,当某个字段存储的是JSON数组,需要查询数组中包含特定字符串的记录时传统的LIKE语句无法直接使用,下面小编就为大家介绍两种... 目录问题背景解决方案对比1. 精确匹配方案(推荐)2. 模糊匹配方案参数化查询示例使用场景建议性能优

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

MySQL数据库中ENUM的用法是什么详解

《MySQL数据库中ENUM的用法是什么详解》ENUM是一个字符串对象,用于指定一组预定义的值,并可在创建表时使用,下面:本文主要介绍MySQL数据库中ENUM的用法是什么的相关资料,文中通过代码... 目录mysql 中 ENUM 的用法一、ENUM 的定义与语法二、ENUM 的特点三、ENUM 的用法1

MySQL count()聚合函数详解

《MySQLcount()聚合函数详解》MySQL中的COUNT()函数,它是SQL中最常用的聚合函数之一,用于计算表中符合特定条件的行数,本文给大家介绍MySQLcount()聚合函数,感兴趣的朋... 目录核心功能语法形式重要特性与行为如何选择使用哪种形式?总结深入剖析一下 mysql 中的 COUNT

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

mysql中的服务器架构详解

《mysql中的服务器架构详解》:本文主要介绍mysql中的服务器架构,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、mysql服务器架构解释3、总结1、背景简单理解一下mysqphpl的服务器架构。2、mysjsql服务器架构解释mysql的架

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

MySQL之InnoDB存储引擎中的索引用法及说明

《MySQL之InnoDB存储引擎中的索引用法及说明》:本文主要介绍MySQL之InnoDB存储引擎中的索引用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录1、背景2、准备3、正篇【1】存储用户记录的数据页【2】存储目录项记录的数据页【3】聚簇索引【4】二

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的