kafka 消费能力小小见解及解决方案

2024-03-19 01:10

本文主要是介绍kafka 消费能力小小见解及解决方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.kafka 消费能力低的原因

kafka的速度是很快,所以一般来说producer的生产消息的逻辑速度都会比consumer的消费消息的逻辑速度快,查看topic情况发现:

MUC_EMP_CHANGE_NOTIFY

MUC_ORG

app_action

h5_action

topic的分区数partitions都是1 副本数replication-factor都是1,如下图

查看topic情况

./kafka-topics.sh --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --topic MUC_ORG --describe

在这里插入图片描述

查看消费情况,lag出现延迟1万

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --group mop-consumer --topic MUC_ORG

在这里插入图片描述
但是新创建的topic DEMO_KAFKA_SERVICE,如下图

./kafka-topics.sh --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --topic DEMO_KAFKA_SERVICE --describe

在这里插入图片描述

造成这种现象是server.properties修改了,新的server.properties增加了如下配置:

default.replication.factor=3replica.fetch.max.bytes=5242880delete.topic.enable=true

增加了分区和副本数,但是旧的topic还是之前配置,造成了消费能力还是一样低

2.解决方案(提高了partition的数量)

提高了partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力
对于单partition的消费线程,增加了一个固定长度的阻塞队列和工作线程池进一步提高并行消费的能力(暂不考虑)
使用spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提供了多种提交策略(暂不考虑)

1) 修改server.properties配置,添加如下配置,重启kafka(已配置)
default.replication.factor=3replica.fetch.max.bytes=5242880delete.topic.enable=true
2) 删除topic,kafka暂时不支持修改topic的副本数
 ./kafka-topics.sh --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --delete –topic topicname
3) 修复后的消费情况

在这里插入图片描述

4)进入 ./zookeeper-shell.sh 172.28.21.250:2181 查看group

ls /consumers

删除 lag偏差的group rmr /consumers/group1

5)重启 mx-apps 服务重新创建group
6) 查看 消费情况

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --group apps-consumer --topic MUC_ORG

这篇关于kafka 消费能力小小见解及解决方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/824322

相关文章

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

数据库oracle用户密码过期查询及解决方案

《数据库oracle用户密码过期查询及解决方案》:本文主要介绍如何处理ORACLE数据库用户密码过期和修改密码期限的问题,包括创建用户、赋予权限、修改密码、解锁用户和设置密码期限,文中通过代码介绍... 目录前言一、创建用户、赋予权限、修改密码、解锁用户和设置期限二、查询用户密码期限和过期后的修改1.查询用

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

深入理解Redis大key的危害及解决方案

《深入理解Redis大key的危害及解决方案》本文主要介绍了深入理解Redis大key的危害及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、背景二、什么是大key三、大key评价标准四、大key 产生的原因与场景五、大key影响与危

Xshell远程连接失败以及解决方案

《Xshell远程连接失败以及解决方案》本文介绍了在Windows11家庭版和CentOS系统中解决Xshell无法连接远程服务器问题的步骤,在Windows11家庭版中,需要通过设置添加SSH功能并... 目录一.问题描述二.原因分析及解决办法2.1添加ssh功能2.2 在Windows中开启ssh服务2

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

python 字典d[k]中key不存在的解决方案

《python字典d[k]中key不存在的解决方案》本文主要介绍了在Python中处理字典键不存在时获取默认值的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录defaultdict:处理找不到的键的一个选择特殊方法__missing__有时候为了方便起见,

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

Linux限制ip访问的解决方案

《Linux限制ip访问的解决方案》为了修复安全扫描中发现的漏洞,我们需要对某些服务设置访问限制,具体来说,就是要确保只有指定的内部IP地址能够访问这些服务,所以本文给大家介绍了Linux限制ip访问... 目录背景:解决方案:使用Firewalld防火墙规则验证方法深度了解防火墙逻辑应用场景与扩展背景:

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选