图解Kafka | 5张图讲透Kafka 消费者交付语义

2024-08-25 22:36

本文主要是介绍图解Kafka | 5张图讲透Kafka 消费者交付语义,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka 消费者交付语义指的是 Kafka 消费者在处理消息时如何保证消息的可靠性和一致性。这涉及到消息是否被丢失、重复处理或者按顺序消费。

Kafka消费者交付语义有三种,即:

  • 最多一次
  • 至少一次
  • 精确一次

当消费者组/消费者从 Kafka 消费数据时,仅支持最多一次和至少一次这两种语义。但是您可以通过选择适当的数据存储来实现类似于精确一次的交付语义,例如,任何键值存储、RDBMS(主键)、Elasticsearch或任何其他支持幂等写入的存储。

最多一次

在最多一次传递语义中,消息最多只能传递一次。在这种语义中,宁可丢失消息也不应重复传递消息。采用最多一次语义的应用程序可以轻松实现更高的吞吐量和较低的延迟。默认情况下,由于“enable.auto.commit”为 true,因此Kafka消费者设置为使用“最多一次”传递语义。

这种语义下,如果消费者在将消息提交为已读,但是在处理消息之前宕机了或者消息处理失败,则未处理的消息将丢失,并且不会再次读取,分区重新平衡将导致另一个消费者从上次提交的偏移量读取消息。

如下图所示,消息是分批读取的,批次中的部分或全部消息可能未处理,但仍已提交为已处理,这就造成了消息的丢失。

至少一次

在至少一次传递语义中,可以多次传递消息,但不应丢失任何消息。消费者确保所有消息都被读取和处理,即使这可能导致消息重复。为了在消费数据时做到至少一次语义,需要将“enable.auto.commit”值设置为“false”`,您可以选择在处理完消息后手动提交,这样你就掌握了消费偏移量提交的主动权,只有消费成功的消息偏移量才会提交。

如果消费者在处理消息之前发生故障,未处理的消息不会丢失,因为偏移量未提交,分区重新平衡将导致另一个消费者从上次提交的偏移量再次读取相同的消息进进行处理。

但是如果消费者在处理消息之后、提交消费偏移量之前发生故障,因为偏移量未提交,分区重新平衡将导致另一个消费者从上次提交的偏移量再次读取相同的消息进进行处理,这就导致这批消息会被重复消费。

精确一次

在精确一次传递语义中,一条消息只能传递一次,并且不能丢失任何消息。这是所有传递语义中最困难的。与其他两种语义相比,采用精确一次语义的应用程序可能具有较低的吞吐量和较高的延迟。

就像前面介绍的一样,可以通过选择适当的数据存储来实现类似于精确一次的语义。

我们可以通过选择支持幂等写入数据存储来实现。幂等写入意味着即使重复执行相同的写入操作,结果也不会改变。这可以确保在至少一次语义中,即使消息被多次处理,最终数据存储中的数据不会重复。

假设我们使用MySQL作为数据存储,并且每条消息都有一个唯一的ID(例如,消息偏移量)。我们将消息写入MySQL数据库的表中,并使用消息的唯一ID作为主键。

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 禁用自动提交
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("myTopic"));try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password")) {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {String insertSQL = "INSERT INTO my_table (id, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value=?";try (PreparedStatement ps = connection.prepareStatement(insertSQL)) {ps.setLong(1, record.offset());ps.setString(2, record.value());ps.setString(3, record.value());ps.executeUpdate();}}consumer.commitSync();}
} catch (SQLException e) {e.printStackTrace();
}

下面图表展示了如何使用消息偏移量作为唯一标识符进行幂等写入:

Kafka Topic: myTopic
+-----------+-----------+-----------+
| Offset 0  | Offset 1  | Offset 2  |
| Message A | Message B | Message C |
+-----------+-----------+-----------+消费者读取消息并将其写入MySQL数据库:
+-------------------------------+
| MySQL数据库: my_table         |
+-----------+-------------------+
| ID (Offset) | Value           |
+-----------+-------------------+
| 0          | Message A        |
| 1          | Message B        |
| 2          | Message C        |
+-----------+-------------------+如果消息重复处理(例如,Offset 1的Message B),由于使用了ON DUPLICATE KEY UPDATE,写入操作将更新现有记录,而不会插入重复记录。

这篇关于图解Kafka | 5张图讲透Kafka 消费者交付语义的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1106839

相关文章

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

龙蜥操作系统Anolis OS-23.x安装配置图解教程(保姆级)

《龙蜥操作系统AnolisOS-23.x安装配置图解教程(保姆级)》:本文主要介绍了安装和配置AnolisOS23.2系统,包括分区、软件选择、设置root密码、网络配置、主机名设置和禁用SELinux的步骤,详细内容请阅读本文,希望能对你有所帮助... ‌AnolisOS‌是由阿里云推出的开源操作系统,旨

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

理解分类器(linear)为什么可以做语义方向的指导?(解纠缠)

Attribute Manipulation(属性编辑)、disentanglement(解纠缠)常用的两种做法:线性探针和PCA_disentanglement和alignment-CSDN博客 在解纠缠的过程中,有一种非常简单的方法来引导G向某个方向进行生成,然后我们通过向不同的方向进行行走,那么就会得到这个属性上的图像。那么你利用多个方向进行生成,便得到了各种方向的图像,每个方向对应了很多

图解TCP三次握手|深度解析|为什么是三次

写在前面 这篇文章我们来讲解析 TCP三次握手。 TCP 报文段 传输控制块TCB:存储了每一个连接中的一些重要信息。比如TCP连接表,指向发送和接收缓冲的指针,指向重传队列的指针,当前的发送和接收序列等等。 我们再来看一下TCP报文段的组成结构 TCP 三次握手 过程 假设有一台客户端,B有一台服务器。最初两端的TCP进程都是处于CLOSED关闭状态,客户端A打开链接,服务器端

java线程深度解析(五)——并发模型(生产者-消费者)

http://blog.csdn.net/Daybreak1209/article/details/51378055 三、生产者-消费者模式     在经典的多线程模式中,生产者-消费者为多线程间协作提供了良好的解决方案。基本原理是两类线程,即若干个生产者和若干个消费者,生产者负责提交用户请求任务(到内存缓冲区),消费者线程负责处理任务(从内存缓冲区中取任务进行处理),两类线程之

图解可观测Metrics, tracing, and logging

最近在看Gophercon大会PPT的时候无意中看到了关于Metrics,Tracing和Logging相关的一篇文章,凑巧这些我基本都接触过,也是去年后半年到现在一直在做和研究的东西。从去年的关于Metrics的goappmonitor,到今年在排查问题时脑洞的基于log全链路(Tracing)追踪系统的设计,正好是对这三个话题的实践。这不禁让我对它们的关系进行思考:Metrics和Loggi

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队