Apache Kafka(五)- Safe Kafka Producer

2024-04-24 07:32
文章标签 apache kafka safe producer

本文主要是介绍Apache Kafka(五)- Safe Kafka Producer,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka Safe Producer

在应用Kafka的场景中,需要考虑到在异常发生时(如网络异常),被发送的消息有可能会出现丢失、乱序、以及重复消息。

对于这些情况,我们可以创建一个“safe producer”,用于规避这些问题。下面我们会先介绍对于这几种情况的说明以及配置,最后给出一个配置示例。

 

1. acks 详述

之前我们介绍过 Kafka Producer 的 acks 有三种模式,下面我们进一步介绍一下这三种模式:

1.1.  acks = 0(no acks

使用acks=0 时,也就意味着:

  • 在发送一条message 后,不需要response
  • 如果broker 下线或是发生了故障,则我们不会知道,并且会丢失数据,因为broker不会返回任何response 给producers

acks=0 的工作方式如下图,不需要收到任何 ack:

 

一般使用 acks=0 的场景为:可以接受可能丢失数据。例如:

  • 指标信息收集
  • 日志收集(可接受偶尔丢失几条log数据)

 

1.2.  acks=1(leader acks

使用acks=1 时:

  • producer需要获取leader 的response,才能确认消息已被收到。但是replication是否收到则不会保证(会在后台执行replication)
  • 如果producer 没有收到ack,则可能会retry

acks=1 的工作方式如下图,Producer需要收到每条消息的ack:

 

  • 如果leader broker下线或是发生故障,但是replicas还没有复制发送的数据,则我们也会有数据丢失
  • 默认是这个模式

 

1.3.  acks=all(replicas acks

使用acks=all 时:

  • 需要Leader与Replicas的ack
  • 增加了latency与更高的“数据不丢失”安全性
  • 如果有足够的replicas,则不会有数据丢失

acks=all的一个工作方式如下,每个replica都需要回复ack,才能保证一个write的写入:

 

 

如果是需要完全不丢失数据,则这个设置是有必要考虑的。

在设置 acks=all(也就是replica acks)时,必须与另一个参数一起用,也就是:min.insync.replicas:

  • min.insync.replicas参数可以在broker level设置或是topic level设置(override)
  • min.insync.replicas=2表示的是:至少2个brokers是ISR(in-sync replicas)(包括leader),且必须响应表示它们有数据,否则就会返回报错。设置此参数为2也是最常见的配置。

假设设置参数replication.factor=3, min.insync=2, acks=all,则最多只能容许1一个broker异常,否则producer在发送数据时会收到报错。

假设有3个brokers,min.insync.replicas=2,若其中有两个broker异常,则Producer会收到“NOT_ENOUGH_REPLICAS”的异常。

 

2. Producer Retry

为了防止一些瞬时的错误(例如NotEnoughReplicasException)影响整个应用,一般我们需要处理一些异常,以避免数据丢失。在Producer中也有重试的配置,默认为0,可以手动调整它的值,最高可以到Integer.MAX_VALUE。

在重试时,默认情况下,会有可能造成消息发送时乱序。因为一般发送失败的消息会被re-queue,然后再次发送,所以会造成部分消息乱序。

此情况在key-based 序列消息中,尤为严重。因为所有具有相同key的messages会被送往同一个partition,而若是有消息被requeue,然后重传,则会打乱这个partition中的部分key顺序。

对于这种情况,我们可以设置参数max.in.flight.requests.per.connection控制:同一时刻,有多少个produce请求可以被并行发起:

  • 默认为5
  • 如果为了完全确保重试后的消息也能保持严格有序,则可以设置此参数为1(但是可能会影响throughput)

 不过在 Kafka >= 1.0.0中,对于此场景会有更好的解决方案,本文之后的部分会提及。

 

3. Idempotent Producer

在重传场景中,会遇到一个常见的问题是:由于网络的原因,Producer会在Kafka中引入重复的messages。

如下图所示:

 

一个正常的request请求为:

  1. Producer 发送消息到Kafka
  2. Kafka commit 这条消息
  3. Kafka发送ack回Producer

但是一个产生重复消息的请求过程为:

  1. Producer发送消息到Kafka
  2. Kafka commit 这条消息
  3. Kafka发送ack回Producer时,由于网络原因,ack未到达Producer端
  4. Producer过一段时间后开始重传消息
  5. Kafka commit这条重复的消息并返回ack给Producer

从Producer的角度看,它仅正常发送了一次消息,因为它只收到了一次 ack。从Kafka的角度看,它收到了两次消息,所以commit了两次。

在Kafka >= 0.11之后,可以定义一个“idempotent producer”,可以解决由网络问题造成的重复消息。如下图所示:

对于一个idempotent producer 来说,处理重复消息的请求过程为:

  1. Producer 发送一条消息
  2. Kafka收到消息并commit
  3. Kafka发送回Producer的ack由于网络问题未到达Producer
  4. Producer重试发送消息,在Producer>=0.11 的版本中,消息里会带上一个produce request id
  5. Kafka收到消息后,通过对比produce request id,可以辨别出这条消息是一条重复的消息,所以不会再次commit,并会再次发送一个ack

Idempotent producers可以很好的保证一个稳定,以及无重复数据的pipeline。

伴随Idempotent producers一起被设置的参数有:

  1. retries = Integer.MAX_VALUE (2^31-1 = 2147483647),也就是说基本会在出错时无限重传
  2. max.in.flight.requests=1 (Kafka >= 0.11 & < 1.1) ,也就是说在这些版本中,若是设置max.in.flight.requests > 1时仍会有可能产生乱序数据
  3. 或者 max.in.flight.requests=5 (Kafka >= 1.1 –> High Performance),也就是说在高于1.1版本的Kafka中,设置max.in.flight.requests=5也可以在保证不乱序的同时,保证并行的高性能

 

对于Idempotent Producer的配置,仅需配置类似以下参数即可:

properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

 

4. Safe Producer 配置总结

上面介绍了创建一个safe producer 所需的配置,下面我们总结一下在不同版本的Kafka中所需要做的配置:

Kafka < 0.11

  • ack=all (procuder level):确保在发送ack前,数据已经正常备份
  • min.insync.replicas=2 (broker/topic level):确保至少有两个in ISR 的brokers 有数据后再回送ack
  • retires=MAX_INT (producer level):确保在发生瞬时问题时,可以无限次重试
  • max.in.flight.requests.per.connection=1 (producer level):确保每次仅有一个请求发送,防止在重试时产生乱序数据

 

Kafka >= 0.11

  • enable.idempotence=true (producer level) + min.insync.replicas=2 (broker/topic level)
    • 隐含的配置为 acks=all, retries=MAX_INT, max.in.flight.requests.per.connection=5 (default)
    • 可以在保证消息顺序的同时,提高performance

这里必须要提到的是:运行一个“safe producer”可能会影响系统的throughput与latency,所以在应用到生产系统前,必须先做测试以判断影响。

 

5. safe producer 示例

我们按照之前的步骤启动一个由Java编写的Kafka Producer,并查看输出的配置,可以看到默认的部分参数为:

acks = 1

enable.idempotence = false

max.in.flight.requests.per.connection = 5

retries = 2147483647

 

现在我们显式地加上以下参数:

// create a safe Producer
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));

 

然后查看producer配置的部分输出为:

 

 

 

以上为创建一个safe producer所需的配置介绍以及示例,在实际生产环境中,务必要先测试safe producer 对应用吞吐以及延时的影响后,再斟酌是否有必要对参数做部分调整。

 

这篇关于Apache Kafka(五)- Safe Kafka Producer的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/931118

相关文章

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

Apache Tomcat服务器版本号隐藏的几种方法

《ApacheTomcat服务器版本号隐藏的几种方法》本文主要介绍了ApacheTomcat服务器版本号隐藏的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1. 隐藏HTTP响应头中的Server信息编辑 server.XML 文件2. 修China编程改错误

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

SpringBoot使用Apache POI库读取Excel文件的操作详解

《SpringBoot使用ApachePOI库读取Excel文件的操作详解》在日常开发中,我们经常需要处理Excel文件中的数据,无论是从数据库导入数据、处理数据报表,还是批量生成数据,都可能会遇到... 目录项目背景依赖导入读取Excel模板的实现代码实现代码解析ExcelDemoInfoDTO 数据传输

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Apache Tiles 布局管理器

陈科肇 =========== 1.简介 一个免费的开源模板框架现代Java应用程序。  基于该复合图案它是建立以简化的用户界面的开发。 对于复杂的网站,它仍然最简单,最优雅的方式来一起工作的任何MVC技术。 Tiles允许作者定义页面片段可被组装成在运行一个完整的网页。  这些片段,或Tiles,可以用于为了降低公共页面元素的重复,简单地包括或嵌入在其它瓦片,制定了一系列可重复使用

Apache HttpClient使用详解

转载地址:http://eksliang.iteye.com/blog/2191017 Http协议的重要性相信不用我多说了,HttpClient相比传统JDK自带的URLConnection,增加了易用性和灵活性(具体区别,日后我们再讨论),它不仅是客户端发送Http请求变得容易,而且也方便了开发人员测试接口(基于Http协议的),即提高了开发的效率,也方便提高代码的健壮性。因此熟