本文主要是介绍kafka多线程并发消费处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、在kafka异步处理数据的时候,为了提高kafka的效率,通常是一口气拉取批量数据进行计算,但是kafka分区数的有限决定了消费者的数量限制,简单的增加消费者数量无法获取到性能的提升,此时需要将批量数据进行分批多线程处理,并在多个线程执行完毕之后再统一提交偏移量
以下是kafka的消费端的配置
kafka.consumer.servers: 192.168.0.1:9092
kafka.consumer.enable.auto.commit: false
kafka.consumer.session.timeout: 15000
kafka.consumer.auto.commit.interval: 100
kafka.consumer.auto.offset.reset: latest
kafka.consumer.concurrency: 10
kafka.consumer.maxPollRecordsConfig: 100
kafka.consumer.group.id: test01
为了并发处理,在此本人采用了countDownLaunch类来实现,原理: CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景
countDownLatch的常规方法说明:
CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。
await();//阻塞当前线程,将当前线程加入阻塞队列。
await(long timeout,
这篇关于kafka多线程并发消费处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!