更加深入理解Kafka--Producer篇(下)

2024-03-08 14:38

本文主要是介绍更加深入理解Kafka--Producer篇(下),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

批次

积累器在创建批次之前,就在堆上为它预分配一段空间,这段空间用于装载消息。消息最终会顺序落到内存块中形成消息集。批次的逻辑结构如下:
批次逻辑结构.jpg

5.0.1

* MemoryRecords即消息集的抽象,它容纳0到多条Record。
* Record则代表消息在内存中的状态,即按二进制协议格式化之后的消息结构,它是消息集的元素。
* 用户可通过compression.type配置压缩方式,开启压缩可显著增大内存使用率、同时减少网络开销。Compressor负责压缩消息,它的属性appendStream是个包装流,其结构是DataOutputStream—>压缩处理流—>ByteBufferStream。
压缩器.png
5.0.2

批次失效会关闭消息集使其变为只读状态,并引起Compressor关闭:释放全部I/O资源并在开启压缩时在缓冲头部位置填充协议元数据。关闭后缓冲将不再有消息写入,它被回给消息集并flip后等待发送。

数据协议

批次是消息存储的最小物理单元,读取时就只能按批次整块读取,因此如果没有标准数据协议就无法对数据块做反序列化。

Kafka把消息分割成写前日志、协议头和协议体三部分,协议头和协议体合成协议正文。日志标识消息在批次中的相对顺序和原始正文大小;消息头声明CRC、魔数和属性;最后消息体记录追加时间以及key和value值。

消息物理结构.png

5.1.1

CRC即checkSum值,用于校验消息是否完整;魔数用于声明所用协议版本;属性占1个字节即8位,目前只使用了前三位,每一位代表一种压缩协议,为0即不压缩;key和value几乎一致,前4个字节标识内容长度,如果内容为-1,则表示无内容填入。

当开启压缩时,Compressor会对消息集偏移在起始位置预留出报文头长度的位置,在批次关闭后再将报文头相关数据写入,因为正文长度、payload长度以及消息数量都只能在消息只读后确定。报文头加上消息集才是完整的压缩报文。压缩报文结构和消息几乎一致,也分日志和正文两个部分,但是在个别属性上会有细微差异:1)offset分别被用于标识消息数量;2)没有key值,所有key长度都是-1;3)value长度是消息集(压缩后)的长度,payload就是消息集本身。报文头并不会被压缩,因此可以很容易被读取,程序识别报文的长度、压缩协议、版本号以及CRC等属性之后就可以选用合适的方式读取一定长度的消息以及校验批次的完整性。

批次管理

批次创建后会逗留linger.ms时间,它集聚该段时间内归属该分组(区)的消息。如果生产速率特别高又或者有超大消息流入很快将分区打满,则实际逗留时间会低于linger.ms。想象一下极端场景,批次大小默认16k,如果消息以5k、12k间隔发,则内存实际利用率只有(5+12)/(2*16)。

另一方面,积累器挤出前先要做就绪节点检查,挤出动作也只针对leader在这些节点上的分区批次,但节点ready to drain后,可能因为连接或者inflightRequests超限等问题,被从发送就绪列表移除,从而导致这些节点的可发送批次不会被挤出。它们始终占据分组队列的最高挤出优先级,这会导致:1)后追加的消息被积压,即使连接恢复后新入的消息也只能等待顺序处理,整体投递延时猛增。2)批次占据的内存得不到释放,有可能发生雪崩:因为只有追加没有挤出,问题节点的批次有可能占满全部内存空间导致其他正常节点分区无法为新批次申请空间。Kafka提供请求超时timeout.ms解决这个问题,从逗留截止开始计算批次超时则被废弃–释放内存空间并从分组队列移除。

理想状况下,单位时间内追入和挤出应该恰好相等且内存被充分使用。长期观察下调好linger.ms、batch.size、timeout.ms以及batch.size和buffer.memory这几个参数将有助于达到这个目标。

内存管理

消息集内存直接分配在堆上,如果对它不加以限制在消息生产速率足够高时很可能频繁出现fgc乃至oom,另一方面频繁的内存申请和释放操作也很吃系统资源,因此Kafka自建了内存池BufferPool管理内存。

内存池有四个关键属性:totalMemory代表内存池上限,由buffer.memory决定;poolableSize指池化内存块大小,由batch.size设置;free和availableMemory则分别代表池化内存和闲置内存大小。注意free和available的区别,前者是已申请但未使用,后者是未申请未使用,它们之间关系:totalMemory= 可使用空间+已使用空间,可使用空间=availableMemory+free.size()*poolableSize代表。

只有固定大小的内存块被释放后才会进入池化列表,非常规释放后只会增加可用内存大小,而释放内存则由虚拟机回收。因此如果超大消息比较多,依然有可能会引起fgc乃至oom。

积累器通过内存池预分配消息集内存,如果没有足够内存则用户主线程被放入有序队列并进入等待。批在批次done时释放出部分空间,同时唤醒队首线程,如果没有释放出足够的空间则继续进入等待,如果已经释放出足够空间,分配空间且线程出队。
内存池.png

5.3.1

这篇关于更加深入理解Kafka--Producer篇(下)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/787413

相关文章

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

一文带你深入了解Python中的GeneratorExit异常处理

《一文带你深入了解Python中的GeneratorExit异常处理》GeneratorExit是Python内置的异常,当生成器或协程被强制关闭时,Python解释器会向其发送这个异常,下面我们来看... 目录GeneratorExit:协程世界的死亡通知书什么是GeneratorExit实际中的问题案例

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

深入解析Spring TransactionTemplate 高级用法(示例代码)

《深入解析SpringTransactionTemplate高级用法(示例代码)》TransactionTemplate是Spring框架中一个强大的工具,它允许开发者以编程方式控制事务,通过... 目录1. TransactionTemplate 的核心概念2. 核心接口和类3. TransactionT

深入理解Apache Airflow 调度器(最新推荐)

《深入理解ApacheAirflow调度器(最新推荐)》ApacheAirflow调度器是数据管道管理系统的关键组件,负责编排dag中任务的执行,通过理解调度器的角色和工作方式,正确配置调度器,并... 目录什么是Airflow 调度器?Airflow 调度器工作机制配置Airflow调度器调优及优化建议最

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

深入理解C语言的void*

《深入理解C语言的void*》本文主要介绍了C语言的void*,包括它的任意性、编译器对void*的类型检查以及需要显式类型转换的规则,具有一定的参考价值,感兴趣的可以了解一下... 目录一、void* 的类型任意性二、编译器对 void* 的类型检查三、需要显式类型转换占用的字节四、总结一、void* 的