kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量

2024-08-28 04:48

本文主要是介绍kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

往期精选

  • 第一篇我们讲到了docker的单机搭建。

  • 第二篇我们讲到了与springboot的整合。

  • 第三篇我们讲到了kafka的原理。

    这一篇我们将叙述,我是怎么在项目中进行对kafka优化的我们将从三方面进行考虑,一是代码;二是    配置;三是集群。项目背景,做数据迁移工作后面我将写几篇文章讲诉我们是怎么对百万数据进行迁移的工作)。主要场景利用kafka做读写分离,一直请求源数据写入到kafka生产者,然后kafka消费者进行写入数据到新数据。

一、配置优化《报错》节选:

[2018-09-25 11:23:59.370] ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] LoggingErrorHandler.java:37 - Error while processing: nullorg.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1324)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1185)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688)at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.lang.Thread.run(Thread.java:748)

这是由于kafka一直生产数据,导致kafka消费太慢了。我们主要优化也是对消费者进行优化。根据上面的报错,我们可以看到一个参数:max-poll-records,所以我们首先将对提交数,进行调大。具体的需要根据项目进行测试,我们把数进行调大到100,同时对下面的参数进行:

#自动提交offset到zookeeper的时间间隔

auto-commit-interval: 1000

#earliest 

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 

#latest 

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 

#none 

#topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

auto-offset-reset: latest

#提交方式改为false,是否自动周期性提交已经拉取到消费端的消息offset

enable-auto-commit: false

由于使用了spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提供了多种提交策略:

  然后我修改了kafka的配置(spring-kafka),需要到安装的文件(config)下进行修改,分别是生产文件和配置文件。

1.session.timeout.ms=100000(增大session超时时间)。

2.request.timeout.ms=110000(socket握手超时时间,默认是3000 但是kafka配置要求大于session.timeout.ms时间).

同时Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。

二、代码优化:《日志》节选

如果进行了的配置调优,差不多会提高kafka的消费能力,但是写入过大,控制台还是打印下面日志信息:

2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-4, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-1, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-3, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:336 - [Consumer clientId=consumer-4, groupId=test-consumer-group] 

也是就说机制一直打印这些信息,但是又不报错,但是又不写入数据,我们就想,除了配置优化之后,能不能像数据库一样,批量提交或者说是批量消费呢?看了官网资料,发现确实可以,以下是我们对代码的优化,由单一的消费,改为批量消费:

一:增加一个config类。

@Configuration
@EnableKafkapublic class KafkaConfig {@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(10);factory.getContainerProperties().setPollTimeout(1500);factory.setBatchListener(true);//@KafkaListener 批量消费  每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>(16);propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP地址需要修改");propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100000);propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,110000);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 150);//每个批次获取数return propsMap;}}

二、更改消费接受代码。

 @KafkaListener(topics = {"消费名称需要改"})public void listen(List<ConsumerRecord> records, Acknowledgment ack) {try {for (ConsumerRecord record : records) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();log.info("----------------- record =" + record);log.info("------------------ message =" + message);}}} catch (Exception e) {log.error("kafka失败,当前失败的批次。data:{}", records);e.printStackTrace();} finally {ack.acknowledge();}}

集群搭建

前面虽然优化配置和代码,但是代码执行还是不够快,网上寻找资料(提高了partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力),说可以提高分区数量,如果单机怎么提高还是一样的(我们试过了),后来搭建了一个集群。注意我们是使用docker搭建kafka集群的,搭建过程如下。docker-compose.yml内容:

version: '2'services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkaports:- "9095:9095"environment:KAFKA_ADVERTISED_HOST_NAME: IP地址KAFKA_ADVERTISED_PORT: 9095KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://XXXXXXXX:9095KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9095KAFKA_DELETE_TOPIC_ENABLE: "true"KAFKA_LOG_RETENTION_HOURS: 1KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 100000KAFKA_NUM_PARTITIONS: 2KAFKA_DELETE_RETENTION_MS: 1000KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sockkafka-manager:image: sheepkiller/kafka-managerlinks:- kafka- zookeeperenvironment:ZK_HOSTS: zookeeper:2181APPLICATION_SECRET: letmeinKM_ARGS: -Djava.net.preferIPv4Stack=trueports:- "9000:9000"

1.启动的命令:

docker-compose up -d

2.先去修改配置文件的端口,然后再启动相关的命令:

docker-compose scale kafka=2

3.再次修改文件袋的端口,然后再启动相关的命令:

docker-compose scale kafka=3

以上就是我所总结的kafka优化,欢迎有更好的方案进行交流,欢迎关注微信号:繁荣Aaron和转发。

这篇关于kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1113773

相关文章

如何使用Nginx配置将80端口重定向到443端口

《如何使用Nginx配置将80端口重定向到443端口》这篇文章主要为大家详细介绍了如何将Nginx配置为将HTTP(80端口)请求重定向到HTTPS(443端口),文中的示例代码讲解详细,有需要的小伙... 目录1. 创建或编辑Nginx配置文件2. 配置HTTP重定向到HTTPS3. 配置HTTPS服务器

SpringBoot中配置Redis连接池的完整指南

《SpringBoot中配置Redis连接池的完整指南》这篇文章主要为大家详细介绍了SpringBoot中配置Redis连接池的完整指南,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以... 目录一、添加依赖二、配置 Redis 连接池三、测试 Redis 操作四、完整示例代码(一)pom.

Linux内核参数配置与验证详细指南

《Linux内核参数配置与验证详细指南》在Linux系统运维和性能优化中,内核参数(sysctl)的配置至关重要,本文主要来聊聊如何配置与验证这些Linux内核参数,希望对大家有一定的帮助... 目录1. 引言2. 内核参数的作用3. 如何设置内核参数3.1 临时设置(重启失效)3.2 永久设置(重启仍生效

IDEA自动生成注释模板的配置教程

《IDEA自动生成注释模板的配置教程》本文介绍了如何在IntelliJIDEA中配置类和方法的注释模板,包括自动生成项目名称、包名、日期和时间等内容,以及如何定制参数和返回值的注释格式,需要的朋友可以... 目录项目场景配置方法类注释模板定义类开头的注释步骤类注释效果方法注释模板定义方法开头的注释步骤方法注

如何在Mac上安装并配置JDK环境变量详细步骤

《如何在Mac上安装并配置JDK环境变量详细步骤》:本文主要介绍如何在Mac上安装并配置JDK环境变量详细步骤,包括下载JDK、安装JDK、配置环境变量、验证JDK配置以及可选地设置PowerSh... 目录步骤 1:下载JDK步骤 2:安装JDK步骤 3:配置环境变量1. 编辑~/.zshrc(对于zsh

售价599元起! 华为路由器X1/Pro发布 配置与区别一览

《售价599元起!华为路由器X1/Pro发布配置与区别一览》华为路由器X1/Pro发布,有朋友留言问华为路由X1和X1Pro怎么选择,关于这个问题,本期图文将对这二款路由器做了期参数对比,大家看... 华为路由 X1 系列已经正式发布并开启预售,将在 4 月 25 日 10:08 正式开售,两款产品分别为华

SQL server配置管理器找不到如何打开它

《SQLserver配置管理器找不到如何打开它》最近遇到了SQLserver配置管理器打不开的问题,尝试在开始菜单栏搜SQLServerManager无果,于是将自己找到的方法总结分享给大家,对SQ... 目录方法一:桌面图标进入方法二:运行窗口进入方法三:查找文件路径方法四:检查 SQL Server 安

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python Transformer 库安装配置及使用方法

《PythonTransformer库安装配置及使用方法》HuggingFaceTransformers是自然语言处理(NLP)领域最流行的开源库之一,支持基于Transformer架构的预训练模... 目录python 中的 Transformer 库及使用方法一、库的概述二、安装与配置三、基础使用:Pi

MySQL 分区与分库分表策略应用小结

《MySQL分区与分库分表策略应用小结》在大数据量、复杂查询和高并发的应用场景下,单一数据库往往难以满足性能和扩展性的要求,本文将详细介绍这两种策略的基本概念、实现方法及优缺点,并通过实际案例展示如... 目录mysql 分区与分库分表策略1. 数据库水平拆分的背景2. MySQL 分区策略2.1 分区概念