Kafka 为了避免 Full GC,竟然还在发送端设计了内存池,自己管理内存,太巧妙了...

2024-09-06 19:18

本文主要是介绍Kafka 为了避免 Full GC,竟然还在发送端设计了内存池,自己管理内存,太巧妙了...,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、开篇引出一个 Full Gc 的问题

在上一篇文章中,我们讲到了 Kafka 发送消息的八个流程,并且着重讲了 Kafka 封装了一个内存结构,把每个分区的消息封装成批次,缓存到内存里。

如下图所示:

上图中,整体是一个 Map 结构,Map 的 key 是分区,Map 的值是一个队列;队列里有一个个的小批次,里面是很多消息。

这样好处就是可以一次性的把消息发送出去,不至于来一条发送一条,浪费网络资源。

但由此也带来了问题,生产者端消息这么多,一个批次发送完了就不管了去等待 JVM 的垃圾回收的时候,很有可能会触发 full gc。

一次 full gc,整个 Producer 端的所有线程就都停了,所有消息都无法发送了,由此带来的损耗也是不可小觑。

这个严重的问题,当然 Kafka 的开发者也考虑到了这一点,所以作者设计了一个内存池,用来反复利用被发送出去 RecordBatch,以减少 full gc。

二、什么是内存池

可以类比连接池,连接池缓存了很多 jdbc 连接,避免不必要的创建连接的开销;内存池也一样,可以对 RecordBatch 做到反复利用。

那我们看看 Kafka 内存池是怎么设计的:

Kafka 内存设计有两部分,下面的绿色的是可用的内存(未分配的内存,初始的时候是 32M),上面红色的是已经被分配了的内存,每个小 batch 是 16K,然后这一个个的 batch 就可以被反复利用,不需要每次都申请内存。

两部分加起来是 32M。

这个 32M 的配置在 ProducerConfig 这个类里面:

三、申请内存的过程

(发送消息的流程在上一篇文章讲过了,可以回去复习下)

我们从发送消息的大流程的第七步开始看(当前位置:KafkaProducer):

进入到 RecordAccumulator 类里,当发现还没有队列的时候,创建了一个队列,然后去申请内存(当前类位置:RecordAccumulator):

本次我们主要看的就是这个 allocate 方法。点到 allocate 里面,到了 BufferPool 类,BufferPool 是对内存池的封装。然后来一行行看这个申请内存的方法。

(1)如果申请的内存大小超过了整个缓存池的大小,则抛错出来

(2)对整个方法加锁:

this.lock.lock();

(3)如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。

if (size == poolableSize && !this.free.isEmpty())return this.free.pollFirst();

(4)如果要申请的内存大小不是 16K 或者已分配内存没有了的情况。

如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存(即上图绿色的区域)申请一块内存。

并且可用内存要去掉申请的那一块内存。

int freeListSize = this.free.size() * this.poolableSize;
if (this.availableMemory + freeListSize >= size) {// we have enough unallocated or pooled memory to immediately// satisfy the requestfreeUp(size);this.availableMemory -= size;lock.unlock();return ByteBuffer.allocate(size);
}

(5)下面是 else 分支,表示申请的内存大小不是 16 K,或者已分配内存区域没有,并且所有的内存加起来都不够了。

首先创建一个 Condition。Condition 就是用来替代传统的 Object 的 wait() 和 notify() 方法来实现线程间的协作。Condition 必须在 lock 和 unlock 代码块中间才可使用。

Condition moreMemory = this.lock.newCondition();

将 Condition 加入到 waiters 里面。为什么会有多个 Condition 呢?因为这里可能很多个线程都在使用生产者发送消息,可能很多个线程都没有足够的内存分配了,都在等待。

this.waiters.addLast(moreMemory);

然后线程开始睡眠,等待释放资源(唤醒条件有两个,一个是睡眠时间到了,一个是有其他线程释放了内存,被唤醒了):

(7)如果等了指定时间(默认配置是 60s - 获取元数据的时间),还没被唤醒,则直接抛一个缓存超时的异常出去

if (waitingTimeElapsed) {this.waiters.remove(moreMemory);throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}

(8)如果有其他线程释放内存,被唤醒了,从 waiters 列表里面移除自己,然后去看看有没有内存可以用。

这里仍然有两个分支,一个是首先看已分配内存里面有没有内存(16K),如果有的话,直接拿一个 batch 出来

if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {// just grab a buffer from the free listbuffer = this.free.pollFirst();accumulated = size;
}

另一个分支是,如果要申请的不是 16K,或者已分配内存空间不是空的

// 从已分配内存取一个出来放到可用内存区域
freeUp(size - accumulated);
// 申请一块,有可能只能申请到2K
int got = (int) Math.min(size - accumulated, this.availableMemory);
// 做扣减
this.availableMemory -= got;
accumulated += got;

有可能这里只能申请到一部分内存,比如3K,5K,没有达到想申请的那个数量,则会继续走 while 循环。

(9)最后发现内存有富余,则唤醒其他线程

if (this.availableMemory > 0 || !this.free.isEmpty()) {if (!this.waiters.isEmpty())this.waiters.peekFirst().signal();
}

四、释放内存的过程

释放内存的过程很简单了,如果释放的是一个批次的大小(16K),则直接加到已分配内存里面

如果没有,则把内存放到可用内存里面,这部分内存等待虚拟机垃圾回收。

public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {if (size == this.poolableSize && size == buffer.capacity()) {buffer.clear();this.free.add(buffer);} else {this.availableMemory += size;}Condition moreMem = this.waiters.peekFirst();if (moreMem != null)moreMem.signal();} finally {lock.unlock();}
}

这里可能会有一个疑问:

为什么释放了一个批次大小(16K)内存的时候,才放到已分配内存里面。我想释放个 1M 的内存,为什么不能往已分配内存里面呢?

假设我们往已分配内存里释放了个 1M 的批次到内存里。

然后发送消息其实是有条件的,要么是许多消息把批次撑满了发送出去,要么是一个批次累积消息的时间到了,就会立马发出去。

如果是一个 1M 的内存批次,才攒了几条消息,一个批次才用了 几十K,时间到了,就把这个 1M 的内存批次发送出去了。

那么可想而知,内存的使用率是会非常低的。

所以这里控制了,已分配内存必须是 16K 的,每个批次的大小必须一致,这样才能充分利用内存空间。

五、总结

本文我们讨论了 Kafka 生产者端设计了一个内存池的结构,反复利用每一个批次,减少 Java 虚拟机的内存回收。

本文中,还涉及到了一个高并发锁的代码,比如 可重入锁 ReentrantLock,Condition,如果有不明白的地方,可以把这部分复习一下,再看这段代码就很容易明白了。

这篇关于Kafka 为了避免 Full GC,竟然还在发送端设计了内存池,自己管理内存,太巧妙了...的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1142884

相关文章

不懂推荐算法也能设计推荐系统

本文以商业化应用推荐为例,告诉我们不懂推荐算法的产品,也能从产品侧出发, 设计出一款不错的推荐系统。 相信很多新手产品,看到算法二字,多是懵圈的。 什么排序算法、最短路径等都是相对传统的算法(注:传统是指科班出身的产品都会接触过)。但对于推荐算法,多数产品对着网上搜到的资源,都会无从下手。特别当某些推荐算法 和 “AI”扯上关系后,更是加大了理解的难度。 但,不了解推荐算法,就无法做推荐系

NameNode内存生产配置

Hadoop2.x 系列,配置 NameNode 内存 NameNode 内存默认 2000m ,如果服务器内存 4G , NameNode 内存可以配置 3g 。在 hadoop-env.sh 文件中配置如下。 HADOOP_NAMENODE_OPTS=-Xmx3072m Hadoop3.x 系列,配置 Nam

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

软考系统规划与管理师考试证书含金量高吗?

2024年软考系统规划与管理师考试报名时间节点: 报名时间:2024年上半年软考将于3月中旬陆续开始报名 考试时间:上半年5月25日到28日,下半年11月9日到12日 分数线:所有科目成绩均须达到45分以上(包括45分)方可通过考试 成绩查询:可在“中国计算机技术职业资格网”上查询软考成绩 出成绩时间:预计在11月左右 证书领取时间:一般在考试成绩公布后3~4个月,各地领取时间有所不同

安全管理体系化的智慧油站开源了。

AI视频监控平台简介 AI视频监控平台是一款功能强大且简单易用的实时算法视频监控系统。它的愿景是最底层打通各大芯片厂商相互间的壁垒,省去繁琐重复的适配流程,实现芯片、算法、应用的全流程组合,从而大大减少企业级应用约95%的开发成本。用户只需在界面上进行简单的操作,就可以实现全视频的接入及布控。摄像头管理模块用于多种终端设备、智能设备的接入及管理。平台支持包括摄像头等终端感知设备接入,为整个平台提

怎么让1台电脑共享给7人同时流畅设计

在当今的创意设计与数字内容生产领域,图形工作站以其强大的计算能力、专业的图形处理能力和稳定的系统性能,成为了众多设计师、动画师、视频编辑师等创意工作者的必备工具。 设计团队面临资源有限,比如只有一台高性能电脑时,如何高效地让七人同时流畅地进行设计工作,便成为了一个亟待解决的问题。 一、硬件升级与配置 1.高性能处理器(CPU):选择多核、高线程的处理器,例如Intel的至强系列或AMD的Ry

基于51单片机的自动转向修复系统的设计与实现

文章目录 前言资料获取设计介绍功能介绍设计清单具体实现截图参考文献设计获取 前言 💗博主介绍:✌全网粉丝10W+,CSDN特邀作者、博客专家、CSDN新星计划导师,一名热衷于单片机技术探索与分享的博主、专注于 精通51/STM32/MSP430/AVR等单片机设计 主要对象是咱们电子相关专业的大学生,希望您们都共创辉煌!✌💗 👇🏻 精彩专栏 推荐订阅👇🏻 单片机

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动