kafka-producer异地性能损耗

2024-01-01 10:48

本文主要是介绍kafka-producer异地性能损耗,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

以下内容摘自雪球,在公司内部的docs上的内容总结,部分隐私信息已经处理改动


https://xueqiu.com/

 

背景:

在进行服务上云的时候发生了性能损耗问题,一步步从网络带宽问题、JDK版本问题、公网时延问题、CPU和内存问题走了很多弯路,最后才定位到kafka-producer,当然这也是由于业务排查过程中对于机房之间时延的几毫秒不重视造成

问题:

对服务本地机房和阿里云压测时,压测结果如下

本地机房

阿里云

 

 

TPS:150K

TPS:3K

从可以看到的问题就是阿里云的TPS比本地的机器低好几倍,

解决:

JDK版本统一,外网带宽绝对大于服务历史峰值,公网时延检测,CPU进行了4核8核的比对(不是性能的瓶颈,因为相同线程数和CPU的使用率都没升上去),内存进行了8GB和16GB对比(因为担心对外内存,合着堆外内存也就占了几MB,也没有FullGC)

以上一大通花费了大量时间之后,业务代码里面有一个推送状态回传的操作,需要将消息发送至kafka,之前一直监控了kafka-consumer(consumer是批量拉取的,而且频率不高所以各项指标都很正常)。但是把kafka-producer的监控指标给忽略了,通过方法耗时统计,找到了性能损耗发生在kafka-producer状态回传,以下内容主要是深入的解析kafka-producer的运行原理并评估在双机房下对性能的影响

1.一条消息发送的过程:send阶段→batching阶段→await-send阶段→inflight阶段→retry阶段

max.block.ms:控制KafkaProducer.send()和KafkaProducer.partitionsFor()的阻塞时间,如果消息速度大于producer交付到server端的阻塞时间, 将会抛出异常

batch.size:默认16Kb,太小降低吞吐率

linger.ms:默认0ms没有延迟,正常情况下想要减小请求的数量,合理设置类似TCP中的Nagle算法,当然batch.size优先

2.服务压测下性能比对

(注意到这一步,已经定位到时机房间的时延问题,主要对比时延的影响,以及如何优化)

batch-size
linger-ms

request-count

阿里云/延迟(ms)

星光/延迟(ms)

默认值(16K)

默认值(0ms)100327231
  10003516779
  10000371027474
32K0ms100515248
  10003934914
  10000407197526
64K0ms100380118
  10003577695
  10000377536665
64K5ms100468132
  10004014654
  10000384576524
64K10ms100388199
  100039671018
  10000396716338
160K100ms100461184
  100041871032
  10000402357253

不要盲目的调大这俩参数,可以看到当batch-size增大对producer有一定的性能提升,但是linger-ms对性能的提升不符合理论依据(本次实验的数据不一定能说明问题)

3.问:但是producer是异步的,怎么调大了batch-size作用还是不大?

答:原因是producer的Record在进入Accumulator之前,首先会先从bootstrap servers获取最新的topic-partition信息,这个过程会阻塞生产线程,直到MetadataRequest完成。所以每一个metadata消耗一个延迟,那么随着消息数量的递增,延时将会被无限放大(这里就在想,怎么来控制metadata的有效期,不要每次都从server端获取就好了)

KafkaProducer.ClusterAndWaitTime waitOnMetadata方法 展开源码

private KafkaProducer.ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {this.metadata.add(topic);Cluster cluster = this.metadata.fetch();Integer partitionsCount = cluster.partitionCountForTopic(topic);if (partitionsCount == null || partition != null && partition >= partitionsCount) {long begin = this.time.milliseconds();long remainingWaitMs = maxWaitMs;long elapsed;do {this.log.trace("Requesting metadata update for topic {}.", topic);this.metadata.add(topic);int version = this.metadata.requestUpdate();this.sender.wakeup();try {this.metadata.awaitUpdate(version, remainingWaitMs);} catch (TimeoutException var15) {throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}cluster = this.metadata.fetch();elapsed = this.time.milliseconds() - begin;if (elapsed >= maxWaitMs) {throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}if (cluster.unauthorizedTopics().contains(topic)) {throw new TopicAuthorizationException(topic);}remainingWaitMs = maxWaitMs - elapsed;partitionsCount = cluster.partitionCountForTopic(topic);} while(partitionsCount == null);if (partition != null && partition >= partitionsCount) {throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));} else {return new KafkaProducer.ClusterAndWaitTime(cluster, elapsed);}} else {return new KafkaProducer.ClusterAndWaitTime(cluster, 0L);}}

metadata.max.age.ms:就是这个参数,控制着metadata的有效时间,把它调大就好了 (错误,这个意思理解错了)

在一个函数中有这么一个调用关系:

1.把needUpdate置为true
2.唤起sender
3.阻塞awaitUpdate

也就是说当Sender成功更新meatadata之后,version加1。否则会wait个maxWaitMs时间,欲哭无泪丧尽天良,每次都要强制从server端获取过metadata之后才允许往下一步进行。。。。

Metadata的awaitUpdate方法毁灭了我的幻想 展开源码

public synchronized void awaitUpdate(int lastVersion, long maxWaitMs) throws InterruptedException {if (maxWaitMs < 0L) {throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");} else {long begin = System.currentTimeMillis();long elapsed;for(long remainingWaitMs = maxWaitMs; this.version <= lastVersion; remainingWaitMs = maxWaitMs - elapsed) {AuthenticationException ex = this.getAndClearAuthenticationException();if (ex != null) {throw ex;}if (remainingWaitMs != 0L) {this.wait(remainingWaitMs);}elapsed = System.currentTimeMillis() - begin;if (elapsed >= maxWaitMs) {throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}}}}

结论:

只要时延存在,没有银弹

只不过会在低请求时不会暴露问题,而随着请求数的增长,这个时延问题会一直被放大(xueqiu-push项目中50个以下就看不出来)

目前对于这个问题的解决路径是调大了metadata的expired-time,让producer在异步send的时候不在waitOnMetadata方法阻塞太长时间(错误,这个意思理解错了,请看上面解释)

所以要么在外面再添加一层异步调用,要么把kafka的server给换成本地的,网络延时kafka-client-1.X版本下目前还是会阻塞业务的

that's all!注意kafka的所有参数都有用,辛亏把matric监控指标打的全!!!!!欧耶!!

这篇关于kafka-producer异地性能损耗的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/559025

相关文章

Vue3 的 shallowRef 和 shallowReactive:优化性能

大家对 Vue3 的 ref 和 reactive 都很熟悉,那么对 shallowRef 和 shallowReactive 是否了解呢? 在编程和数据结构中,“shallow”(浅层)通常指对数据结构的最外层进行操作,而不递归地处理其内部或嵌套的数据。这种处理方式关注的是数据结构的第一层属性或元素,而忽略更深层次的嵌套内容。 1. 浅层与深层的对比 1.1 浅层(Shallow) 定义

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

黑神话,XSKY 星飞全闪单卷性能突破310万

当下,云计算仍然是企业主要的基础架构,随着关键业务的逐步虚拟化和云化,对于块存储的性能要求也日益提高。企业对于低延迟、高稳定性的存储解决方案的需求日益迫切。为了满足这些日益增长的 IO 密集型应用场景,众多云服务提供商正在不断推陈出新,推出具有更低时延和更高 IOPS 性能的云硬盘产品。 8 月 22 日 2024 DTCC 大会上(第十五届中国数据库技术大会),XSKY星辰天合正式公布了基于星

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动

PR曲线——一个更敏感的性能评估工具

在不均衡数据集的情况下,精确率-召回率(Precision-Recall, PR)曲线是一种非常有用的工具,因为它提供了比传统的ROC曲线更准确的性能评估。以下是PR曲线在不均衡数据情况下的一些作用: 关注少数类:在不均衡数据集中,少数类的样本数量远少于多数类。PR曲线通过关注少数类(通常是正类)的性能来弥补这一点,因为它直接评估模型在识别正类方面的能力。 精确率与召回率的平衡:精确率(Pr

SQL2005 性能监视器计数器错误解决方法

【系统环境】 windows 2003 +sql2005 【问题状况】 用户在不正当删除SQL2005后会造成SQL2005 性能监视器计数器错误,如下图 【解决办法】 1、在 “开始” --> “运行”中输入 regedit,开启注册表编辑器,定位到 [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVer

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准