本文主要是介绍kafka 消费能力小小见解及解决方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.kafka 消费能力低的原因
kafka的速度是很快,所以一般来说producer的生产消息的逻辑速度都会比consumer的消费消息的逻辑速度快,查看topic情况发现:
MUC_EMP_CHANGE_NOTIFY
MUC_ORG
app_action
h5_action
topic的分区数partitions都是1 副本数replication-factor都是1,如下图
查看topic情况
./kafka-topics.sh --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --topic MUC_ORG --describe
查看消费情况,lag出现延迟1万
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --group mop-consumer --topic MUC_ORG
但是新创建的topic DEMO_KAFKA_SERVICE,如下图
./kafka-topics.sh --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --topic DEMO_KAFKA_SERVICE --describe
造成这种现象是server.properties修改了,新的server.properties增加了如下配置:
default.replication.factor=3replica.fetch.max.bytes=5242880delete.topic.enable=true
增加了分区和副本数,但是旧的topic还是之前配置,造成了消费能力还是一样低
2.解决方案(提高了partition的数量)
提高了partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力
对于单partition的消费线程,增加了一个固定长度的阻塞队列和工作线程池进一步提高并行消费的能力(暂不考虑)
使用spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提供了多种提交策略(暂不考虑)
1) 修改server.properties配置,添加如下配置,重启kafka(已配置)
default.replication.factor=3replica.fetch.max.bytes=5242880delete.topic.enable=true
2) 删除topic,kafka暂时不支持修改topic的副本数
./kafka-topics.sh --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --delete –topic topicname
3) 修复后的消费情况
4)进入 ./zookeeper-shell.sh 172.28.21.250:2181 查看group
ls /consumers
删除 lag偏差的group rmr /consumers/group1
5)重启 mx-apps 服务重新创建group
6) 查看 消费情况
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --group apps-consumer --topic MUC_ORG
这篇关于kafka 消费能力小小见解及解决方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!