原理+实践|Exactly-once系列实践之KafkaToKafka

2023-11-23 08:59

本文主要是介绍原理+实践|Exactly-once系列实践之KafkaToKafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1886530353e5c510cebead2addbaa01d.png全网最全大数据面试提升手册!

推荐阅读:

  • 原理+实践|Flink-Exactly-Once Kafka2Redis一致性实践

文章目录
一、Kafka输入输出流工具类
二、统计字符个数案例
三、消费者消费kafka的事务数据
四、总结与可能出现的问题

一、Kafka输入输出流工具类

代码如下(示例):

//获取kafkaStream流public static <T> DataStream<T> getKafkaDataStream(ParameterTool parameterTool,Class<? extends DeserializationSchema> clazz,StreamExecutionEnvironment env) throws IllegalAccessException, InstantiationException {//加入到flink的环境全局配置中,后续可以通过上下文获取该工具类,总而得到想要的值env.getConfig().setGlobalJobParameters(parameterTool);//kafka配置项Properties properties = new Properties();properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));properties.setProperty("group.id",parameterTool.get("group.idsource"));properties.setProperty("auto.offset.reset",parameterTool.get("auto.offset.reset"));properties.setProperty("enable.auto.commit",parameterTool.get("enable.auto.commit", String.valueOf(false)));String topics = parameterTool.get("Consumertopics");//序列化类实例化DeserializationSchema<T> deserializationSchema = clazz.newInstance();FlinkKafkaConsumer<T> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializationSchema, properties);flinkKafkaConsumer.setStartFromEarliest();//开启kafka的offset与checkpoint绑定flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);return env.addSource(flinkKafkaConsumer);}//获取kafka生产者通用方法/*** offsets.topic.replication.factor 用于配置offset记录的topic的partition的副本个数* transaction.state.log.replication.factor 事务主题的复制因子* transaction.state.log.min.isr 覆盖事务主题的min.insync.replicas配置** num.partitions 新建Topic时默认的分区数** default.replication.factor 自动创建topic时的默认副本的个数**** 注意:这些参数,设置得更高以确保高可用性!** 其中 default.replication.factor 是真正决定,topi的副本数量的* @param parameterTool* @param kafkaSerializationSchema* @param <T>* @return*/public static <T> FlinkKafkaProducer<T> getFlinkKafkaProducer(ParameterTool parameterTool,KafkaSerializationSchema<T> kafkaSerializationSchema){Properties properties = new Properties();properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));properties.setProperty("group.id",parameterTool.get("group.idsink"));
//        properties.setProperty("transaction.max.timeout.ms",parameterTool.get("transaction.max.timeout.ms"));properties.setProperty("transaction.timeout.ms",parameterTool.get("transaction.timeout.ms"));properties.setProperty("client.id", "flinkOutputTopicClient");String topics = parameterTool.get("Producetopice");return new FlinkKafkaProducer<T>(topics,kafkaSerializationSchema,properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);}

注意点事项

一、消费者注意项

  1. flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true),将kafka自动提交offset关闭并且与flink的CheckPoint绑定

  2. bootstrap.servers kafka的broker host

  3. setStartFromEarliest()设置kafka的消息消费从最初位置开始

二、生产者注意项

  1. transaction.timeout.ms 默认情况下Kafka Broker 将transaction.max.timeout.ms设置为15分钟,我们需要将此值设置低于15分钟

  2. FlinkKafkaProducer.Semantic.EXACTLY_ONCE设置kafka为精确一次

二、统计字符个数案例

代码如下(示例):

public static void main(String[] args) throws Exception {//1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.设置并行度env.setParallelism(4);//3.设置CK和状态后端CkAndStateBacked.setCheckPointAndStateBackend(env,"FS");//4.获取kafkaStream流InputStream kafkaPropertiesStream = KafkaToKafkaExacitly.class.getClassLoader().getResourceAsStream("kafka.properties");ParameterTool parameterTool=ParameterTool.fromPropertiesFile(kafkaPropertiesStream);//将配置流放到全局flink运行时环境env.getConfig().setGlobalJobParameters(parameterTool);SimpleStringSchema simpleStringSchema = new SimpleStringSchema();Class<? extends SimpleStringSchema> stringSchemaClass = simpleStringSchema.getClass();DataStream<String> kafkaDataStream = KafkaUtil.getKafkaDataStream(parameterTool, stringSchemaClass, env);System.out.println("==================================================");kafkaDataStream.print();//5.map包装成value,1SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = kafkaDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {if("error".equals(value)){throw new RuntimeException("发生异常!!!");}return new Tuple2<>(value, 1);}});tupleStream.print();//6.按照value进行分组,并且统计value的个数SingleOutputStreamOperator<Tuple2<String, Integer>> reduceStream = tupleStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}});System.out.println("=====================================================");reduceStream.print();//7.将数据输出到kafkaFlinkKafkaProducer<Tuple2<String, Integer>> flinkKafkaProducer = KafkaUtil.getFlinkKafkaProducer(parameterTool, new KafkaSerializationSchema<Tuple2<String, Integer>>() {@Overridepublic void open(SerializationSchema.InitializationContext context) throws Exception {System.out.println("=========正在向KafkaProduce输出数据!!!=============");}@Overridepublic ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> element, @Nullable Long timestamp) {String producetopics = parameterTool.get("Producetopice");String result = element.toString();return new ProducerRecord<byte[], byte[]>(producetopics, result.getBytes(StandardCharsets.UTF_8));}});reduceStream.addSink(flinkKafkaProducer).name("kafkasinktest").uid("kafkasink");//任务执行env.execute("KafkaToKafkaTest");}

注意事项:
这里使用的是本地FSstateBackend,注意你的路径的设置,以hdfs://或者file://为地址标识符,否则Flink的文件系统将无法识别。

三、消费者消费kafka的事务数据

ublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties sourceProperties = new Properties();sourceProperties.setProperty("bootstrap.servers", "*****");sourceProperties.setProperty("group.id", "****");//端到端一致性:消费数据时需要配置isolation.level=read_committed(默认值为read_uncommitted)sourceProperties.put("isolation.level", "read_committed");FlinkKafkaConsumer<String> ConsumerKafka = new FlinkKafkaConsumer<>("*****", new SimpleStringSchema(), sourceProperties);ConsumerKafka.setStartFromEarliest();DataStreamSource<String> dataStreamSource = env.addSource(ConsumerKafka);dataStreamSource.print();env.execute();}

isolation.level这里设置为read_committed(默认为read_uncommitted) 这里可以看到以你CheckPoint设置的时间,来批量展示kafka生产者的消息。

四、总结与可能出现的问题

以上是flink 实现kafka的精确一次的测试例子,这里还有一点要注意,就是小伙伴们的kafka的配置里面。

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
default.replication.factor=1

这四个参数里面default.replication.factor是你kafka真正每个topic的副本数量,但是在开启事务也就是flink的addsink的时候会默认继承两阶段提交的方式,这里transaction.state.log.replication.factor一定要大于或者等于transaction.state.log.min.isr,否则你的kafka集群不满足事务副本复制的基本属性,会一直不成功,那么你的CheckPoint就会超时过期,从而导致任务的整体失败。

kafka集群第一次有消费者消费消息时会自动创建 __consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50,在开启事务性的情况下就会首先会获得一个全局的TransactionCoordinator id和transactional producer并且生成唯一的序列号等 类似于一下的例子来唯一标识当前事务的消息对应的offset,以及标识。

[2022-03-24 21:07:40,022] INFO [TransactionCoordinator id=0] Initialized transactionalId Keyed Reduce -> (Sink: Print to Std. Out, Sink: kafkasinktest)-b0c5e26be6392399cc3c8a38581a81c2-8 with producerId 11101 and producer epoch 8 on partition __transaction_state-18 (kafka.coordinator.transaction.TransactionCoordinator)

当flink任务出现异常的情况下,kafka会把以及提交但是未标记可以消费的数据直接销毁,或者正常的情况下,会正式提交(本质是修改消息的标志位),之后对于消费者在开启isolation.level的时候就可以读取以及标记为可以读取的message。

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

714cd7647fc638997cb45c1665fe2866.png

c2178796715d446702bee58109e41f47.jpeg

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

这篇关于原理+实践|Exactly-once系列实践之KafkaToKafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/416802

相关文章

C++必修:模版的入门到实践

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C++学习 贝蒂的主页:Betty’s blog 1. 泛型编程 首先让我们来思考一个问题,如何实现一个交换函数? void swap(int& x, int& y){int tmp = x;x = y;y = tmp;} 相信大家很快就能写出上面这段代码,但是如果要求这个交换函数支持字符型

亮相WOT全球技术创新大会,揭秘火山引擎边缘容器技术在泛CDN场景的应用与实践

2024年6月21日-22日,51CTO“WOT全球技术创新大会2024”在北京举办。火山引擎边缘计算架构师李志明受邀参与,以“边缘容器技术在泛CDN场景的应用和实践”为主题,与多位行业资深专家,共同探讨泛CDN行业技术架构以及云原生与边缘计算的发展和展望。 火山引擎边缘计算架构师李志明表示:为更好地解决传统泛CDN类业务运行中的问题,火山引擎边缘容器团队参考行业做法,结合实践经验,打造火山

JavaWeb系列二十: jQuery的DOM操作 下

jQuery的DOM操作 CSS-DOM操作多选框案例页面加载完毕触发方法作业布置jQuery获取选中复选框的值jQuery控制checkbox被选中jQuery控制(全选/全不选/反选)jQuery动态添加删除用户 CSS-DOM操作 获取和设置元素的样式属性: css()获取和设置元素透明度: opacity属性获取和设置元素高度, 宽度: height(), widt

9 个 GraphQL 安全最佳实践

GraphQL 已被最大的平台采用 - Facebook、Twitter、Github、Pinterest、Walmart - 这些大公司不能在安全性上妥协。但是,尽管 GraphQL 可以成为您的 API 的非常安全的选项,但它并不是开箱即用的。事实恰恰相反:即使是最新手的黑客,所有大门都是敞开的。此外,GraphQL 有自己的一套注意事项,因此如果您来自 REST,您可能会错过一些重要步骤!

C语言入门系列:探秘二级指针与多级指针的奇妙世界

文章目录 一,指针的回忆杀1,指针的概念2,指针的声明和赋值3,指针的使用3.1 直接给指针变量赋值3.2 通过*运算符读写指针指向的内存3.2.1 读3.2.2 写 二,二级指针详解1,定义2,示例说明3,二级指针与一级指针、普通变量的关系3.1,与一级指针的关系3.2,与普通变量的关系,示例说明 4,二级指针的常见用途5,二级指针扩展到多级指针 小结 C语言的学习之旅中,二级

数据库原理与安全复习笔记(未完待续)

1 概念 产生与发展:人工管理阶段 → \to → 文件系统阶段 → \to → 数据库系统阶段。 数据库系统特点:数据的管理者(DBMS);数据结构化;数据共享性高,冗余度低,易于扩充;数据独立性高。DBMS 对数据的控制功能:数据的安全性保护;数据的完整性检查;并发控制;数据库恢复。 数据库技术研究领域:数据库管理系统软件的研发;数据库设计;数据库理论。数据模型要素 数据结构:描述数据库

计算机组成原理——RECORD

第一章 概论 1.固件  将部分操作系统固化——即把软件永恒存于只读存储器中。 2.多级层次结构的计算机系统 3.冯*诺依曼计算机的特点 4.现代计算机的组成:CPU、I/O设备、主存储器(MM) 5.细化的计算机组成框图 6.指令操作的三个阶段:取指、分析、执行 第二章 计算机的发展 1.第一台由电子管组成的电子数字积分和计算机(ENIAC) 第三章 系统总线

GaussDB关键技术原理:高性能(二)

GaussDB关键技术原理:高性能(一)从数据库性能优化系统概述对GaussDB的高性能技术进行了解读,本篇将从查询处理综述方面继续分享GaussDB的高性能技术的精彩内容。 2 查询处理综述 内容概要:本章节介绍查询端到端处理的执行流程,首先让读者对查询在数据库内部如何执行有一个初步的认识,充分理解查询处理各阶段主要瓶颈点以及对应的解决方案,本章以GaussDB为例讲解查询执行的几个主要阶段

JavaWeb系列六: 动态WEB开发核心(Servlet) 上

韩老师学生 官网文档为什么会出现Servlet什么是ServletServlet在JavaWeb项目位置Servlet基本使用Servlet开发方式说明快速入门- 手动开发 servlet浏览器请求Servlet UML分析Servlet生命周期GET和POST请求分发处理通过继承HttpServlet开发ServletIDEA配置ServletServlet注意事项和细节 Servlet注

【计算机组成原理】部分题目汇总

计算机组成原理 部分题目汇总 一. 简答题 RISC和CICS 简要说明,比较异同 RISC(精简指令集)注重简单快速的指令执行,使用少量通用寄存器,固定长度指令,优化硬件性能,依赖软件(如编译器)来提升效率。 CISC(复杂指令集)包含多样复杂的指令,能一条指令完成多步操作,采用变长指令,减少指令数但可能增加执行时间,倾向于硬件直接支持复杂功能减轻软件负担。 两者均追求高性能,但RISC