图解Kafka | 5张图讲透Kafka 消费者交付语义

2024-08-25 22:36

本文主要是介绍图解Kafka | 5张图讲透Kafka 消费者交付语义,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka 消费者交付语义指的是 Kafka 消费者在处理消息时如何保证消息的可靠性和一致性。这涉及到消息是否被丢失、重复处理或者按顺序消费。

Kafka消费者交付语义有三种,即:

  • 最多一次
  • 至少一次
  • 精确一次

当消费者组/消费者从 Kafka 消费数据时,仅支持最多一次和至少一次这两种语义。但是您可以通过选择适当的数据存储来实现类似于精确一次的交付语义,例如,任何键值存储、RDBMS(主键)、Elasticsearch或任何其他支持幂等写入的存储。

最多一次

在最多一次传递语义中,消息最多只能传递一次。在这种语义中,宁可丢失消息也不应重复传递消息。采用最多一次语义的应用程序可以轻松实现更高的吞吐量和较低的延迟。默认情况下,由于“enable.auto.commit”为 true,因此Kafka消费者设置为使用“最多一次”传递语义。

这种语义下,如果消费者在将消息提交为已读,但是在处理消息之前宕机了或者消息处理失败,则未处理的消息将丢失,并且不会再次读取,分区重新平衡将导致另一个消费者从上次提交的偏移量读取消息。

如下图所示,消息是分批读取的,批次中的部分或全部消息可能未处理,但仍已提交为已处理,这就造成了消息的丢失。

至少一次

在至少一次传递语义中,可以多次传递消息,但不应丢失任何消息。消费者确保所有消息都被读取和处理,即使这可能导致消息重复。为了在消费数据时做到至少一次语义,需要将“enable.auto.commit”值设置为“false”`,您可以选择在处理完消息后手动提交,这样你就掌握了消费偏移量提交的主动权,只有消费成功的消息偏移量才会提交。

如果消费者在处理消息之前发生故障,未处理的消息不会丢失,因为偏移量未提交,分区重新平衡将导致另一个消费者从上次提交的偏移量再次读取相同的消息进进行处理。

但是如果消费者在处理消息之后、提交消费偏移量之前发生故障,因为偏移量未提交,分区重新平衡将导致另一个消费者从上次提交的偏移量再次读取相同的消息进进行处理,这就导致这批消息会被重复消费。

精确一次

在精确一次传递语义中,一条消息只能传递一次,并且不能丢失任何消息。这是所有传递语义中最困难的。与其他两种语义相比,采用精确一次语义的应用程序可能具有较低的吞吐量和较高的延迟。

就像前面介绍的一样,可以通过选择适当的数据存储来实现类似于精确一次的语义。

我们可以通过选择支持幂等写入数据存储来实现。幂等写入意味着即使重复执行相同的写入操作,结果也不会改变。这可以确保在至少一次语义中,即使消息被多次处理,最终数据存储中的数据不会重复。

假设我们使用MySQL作为数据存储,并且每条消息都有一个唯一的ID(例如,消息偏移量)。我们将消息写入MySQL数据库的表中,并使用消息的唯一ID作为主键。

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 禁用自动提交
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("myTopic"));try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password")) {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {String insertSQL = "INSERT INTO my_table (id, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value=?";try (PreparedStatement ps = connection.prepareStatement(insertSQL)) {ps.setLong(1, record.offset());ps.setString(2, record.value());ps.setString(3, record.value());ps.executeUpdate();}}consumer.commitSync();}
} catch (SQLException e) {e.printStackTrace();
}

下面图表展示了如何使用消息偏移量作为唯一标识符进行幂等写入:

Kafka Topic: myTopic
+-----------+-----------+-----------+
| Offset 0  | Offset 1  | Offset 2  |
| Message A | Message B | Message C |
+-----------+-----------+-----------+消费者读取消息并将其写入MySQL数据库:
+-------------------------------+
| MySQL数据库: my_table         |
+-----------+-------------------+
| ID (Offset) | Value           |
+-----------+-------------------+
| 0          | Message A        |
| 1          | Message B        |
| 2          | Message C        |
+-----------+-------------------+如果消息重复处理(例如,Offset 1的Message B),由于使用了ON DUPLICATE KEY UPDATE,写入操作将更新现有记录,而不会插入重复记录。

这篇关于图解Kafka | 5张图讲透Kafka 消费者交付语义的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1106839

相关文章

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

理解分类器(linear)为什么可以做语义方向的指导?(解纠缠)

Attribute Manipulation(属性编辑)、disentanglement(解纠缠)常用的两种做法:线性探针和PCA_disentanglement和alignment-CSDN博客 在解纠缠的过程中,有一种非常简单的方法来引导G向某个方向进行生成,然后我们通过向不同的方向进行行走,那么就会得到这个属性上的图像。那么你利用多个方向进行生成,便得到了各种方向的图像,每个方向对应了很多

图解TCP三次握手|深度解析|为什么是三次

写在前面 这篇文章我们来讲解析 TCP三次握手。 TCP 报文段 传输控制块TCB:存储了每一个连接中的一些重要信息。比如TCP连接表,指向发送和接收缓冲的指针,指向重传队列的指针,当前的发送和接收序列等等。 我们再来看一下TCP报文段的组成结构 TCP 三次握手 过程 假设有一台客户端,B有一台服务器。最初两端的TCP进程都是处于CLOSED关闭状态,客户端A打开链接,服务器端

java线程深度解析(五)——并发模型(生产者-消费者)

http://blog.csdn.net/Daybreak1209/article/details/51378055 三、生产者-消费者模式     在经典的多线程模式中,生产者-消费者为多线程间协作提供了良好的解决方案。基本原理是两类线程,即若干个生产者和若干个消费者,生产者负责提交用户请求任务(到内存缓冲区),消费者线程负责处理任务(从内存缓冲区中取任务进行处理),两类线程之

图解可观测Metrics, tracing, and logging

最近在看Gophercon大会PPT的时候无意中看到了关于Metrics,Tracing和Logging相关的一篇文章,凑巧这些我基本都接触过,也是去年后半年到现在一直在做和研究的东西。从去年的关于Metrics的goappmonitor,到今年在排查问题时脑洞的基于log全链路(Tracing)追踪系统的设计,正好是对这三个话题的实践。这不禁让我对它们的关系进行思考:Metrics和Loggi

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

数字经济时代,零售企业如何实现以消费者为中心的数字化转型?

在数字经济时代,零售企业正面临着前所未有的挑战与机遇。随着消费者行为的数字化和多样化,传统的零售模式已难以满足市场需求。为了在激烈的市场竞争中立于不败之地,零售企业必须实现以消费者为中心的数字化转型。这一转型不仅仅是技术的升级,更是一场涉及企业战略、组织结构、运营模式和人才管理的深刻变革。本文将探讨零售企业在数字化转型过程中遇到的难点,并提出相应的解决策略,通过实际案例分析,展示如何通过综合措施进

生产者消费者模型(能看懂文字就能明白系列)

系列文章目录 能看懂文字就能明白系列 C语言笔记传送门 Java笔记传送门 🌟 个人主页:古德猫宁- 🌈 信念如阳光,照亮前行的每一步 前言 本节目标: 理解什么是阻塞队列,阻塞队列与普通队列的区别理解什么是生产者消费者模型生产者消费者模型的主要作用 一、阻塞队列 阻塞独立是一个特殊的队列,它具有以下特点: 线程安全带有阻塞特性:即如果队列为空,这时继续出队列的话,