kafka ---- producer与broker配置详解以及ack机制详解

2024-08-29 14:12

本文主要是介绍kafka ---- producer与broker配置详解以及ack机制详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、producer 配置

1、bootstrap.servers

kafka broker集群的ip列表,格式为:host1:port1,host2:port2,…

2、client.id

用于追踪消息的源头

3、retries

当发送失败时客户端会进行重试,重试的次数由retries指定,默认值是2147483647,即 Integer.MAX_VALUE;在重试次数耗尽和delivery.timeout.ms超时时间结束,如果还没发送成功,则会返回失败;一般不会使用此值去控制重试次数,而是使用delivery.timeout.ms这个值去控制;

4、delivery.timeout.ms

发送消息的最长总耗时,即,从 send 方法返回后,到触发 Callback 的总耗时。其包含了,producer内部攒批的时间;向 broker 发送请求并等待返回的时间;多次重试的时间;这个值应该大于等于request.timeout.ms 和 linger.ms的总和

5、 request.timeout.ms

producer发送一次请求等待响应的最大超时时间,如果在超时时间过后未收到响应,则客户端将重新发送请求,如果重试次数用尽,则请求失败。

6、enable.idempotence

设置为“true”时,生产者将确保在流中只写入每条消息的一个副本。如果为 ‘false’,则由于代理故障等原因,又可能会写入多个副本。要开启enable.idempotence,则必须要求如下配置也需要满足

max.in.flight.requests.per.connection <=5(用于确保消息的顺序性)
retries>0
acks=all

默认情况下enable.idempotence是开启的,如果上述配置存在冲突,并且enable.idempotence并没有显式的开启,则enable.idempotence会被disable;如果存在冲突,并且enable.idempotence显式开启,则会抛出ConfigException 异常

7、max.in.flight.requests.per.connection

在该链接被阻塞之前,所能允许的未收到ack响应的请求的最大数量,
如果 max.in.flight.requests.per.connection>1;enable.idempotence=false;retries>=1;将会存在日志无序发送的风险由于重新发送(retries);
如果retries=0或者enable.idempotence=true,则将不存在无序风险。

8、acks

这个指标用于控制发送的记录的持久性,参数详解如下:

  • acks=0 如果被设置为0,则生产者并不会等待任何服务器的确认就会认为该发送是成功的,并不会保证该消息被发送到了服务器并被写到内存中,并且在此配置下,retries的配置将不会生效
  • acks=1 如果被设置成1,则只要leader所在的节点返回了确认,就会认为该发送是成功的,leader并不会等待其他follower的成功确认就会返回成功
  • acks=all 同acks=-1,消息从生产者发送到了leader,leader会等待所有in-sync replicas(ISR列表中的所有成员)返回确认,该leader才会向生产者发送ack确认
    有效值有:[all, -1, 0, 1]

9、buffer.memory

生产者配置缓存的大小,当需要发送消息的时候,会从buffer.memory中分配一个batch.size大小的batch用来攒批次,当数据量达到batch.size大小或者时间达到linger.ms就会被发送,如果消息发送过快,导致buffer.memory被用完,将会堵塞当前线程,堵塞的最大时间是buffer.memory,当超过这个时间,将会抛出异常

10、batch.size

为降低发送的频率,producer会将发送到同一分区的多个记录攒成一个批次来进行批量发送;并且KafkaProducer有一个Sender线程会把多个Batch打包成一个Request发送到Kafka服务器上去。batch.size用来设置该Batch的大小。该值太小会降低吞吐量((批大小为零将完全禁用批处理)),该值太大会造成内存的浪费。但是当数据量较少的时候,很长一段时间无法达到batch.size该怎么办呢,我们使用linger.ms来控制该Batch等待的时间,当该时间达到,即使大小没有达到batch.size也会发送,linger.ms设置为0,代表立即发送。

11、client.dns.lookup

三、broker配置

1、min.insync.replicas

当一个producer的acks被设置成了all或者-1,min.insync.replicas参数设置了一个最小的副本数,确认消息写入成功的副本必须达到该值,该发送才会被确认成功,如果最小值不能被满足,则producer将会抛出异常,如果你的副本数为3,则可以设置该值为2,这将确保必须大多数的副本都成功确认了该消息才会被认为是成功的。

2、replica.lag.time.max.ms

如果在此时间内,follower并没有去发送fetch请求到leader也并没有消耗到leader日志端偏移量,该leader将会从ISR列表中将该follower移除,等到该副本追上了Leader副本的进度,该副本会被再次加入到ISR列表中。该值默认值30000 (30 seconds)

四、ack机制详解

acks=0和acks=1暂且先不讲,着重讲一下acks=all的情况,在复制因子为3的前提下:

1、case 1

当min.insync.replicas=2且acks=all时,如果此时ISR列表只有[1,2],3被踢出ISR列表,只需要保证两个副本同步了,生产者就会收到成功响应。即当前情况仍能对外提供服务。

2、case 2

当min.insync.replicas=2,如果此时ISR列表只有[1],2和3被踢出ISR列表,那么当acks=all时,则不能成功写入数;当acks=0或者acks=1可以成功写入数据。

3、case 3

这种情况是很容易引起误解的,如果acks=all且min.insync.replicas=2,此时ISR列表为[1,2,3],那么还是会等到所有的同步副本都同步了消息,才会向生产者发送成功响应的ack.因为min.insync.replicas=2只是一个最低限制,即同步副本少于该配置值,则会抛异常,而acks=all,是需要保证所有的ISR列表的副本都同步了才可以发送成功响应。

引用

https://www.jianshu.com/p/3eb29d653607

这篇关于kafka ---- producer与broker配置详解以及ack机制详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1118084

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Zookeeper安装和配置说明

一、Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。 ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境; ■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例; ■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble) Zookeeper通过复制来实现

CentOS7安装配置mysql5.7 tar免安装版

一、CentOS7.4系统自带mariadb # 查看系统自带的Mariadb[root@localhost~]# rpm -qa|grep mariadbmariadb-libs-5.5.44-2.el7.centos.x86_64# 卸载系统自带的Mariadb[root@localhost ~]# rpm -e --nodeps mariadb-libs-5.5.44-2.el7

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

NameNode内存生产配置

Hadoop2.x 系列,配置 NameNode 内存 NameNode 内存默认 2000m ,如果服务器内存 4G , NameNode 内存可以配置 3g 。在 hadoop-env.sh 文件中配置如下。 HADOOP_NAMENODE_OPTS=-Xmx3072m Hadoop3.x 系列,配置 Nam

wolfSSL参数设置或配置项解释

1. wolfCrypt Only 解释:wolfCrypt是一个开源的、轻量级的、可移植的加密库,支持多种加密算法和协议。选择“wolfCrypt Only”意味着系统或应用将仅使用wolfCrypt库进行加密操作,而不依赖其他加密库。 2. DTLS Support 解释:DTLS(Datagram Transport Layer Security)是一种基于UDP的安全协议,提供类似于

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3