本文主要是介绍高性能消息中间件 - Kafka3.x(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 高性能消息中间件 - Kafka3.x(二)
- Kafka生产者⭐
- 生产者发生原理⭐
- RecordAccumulator源码简单分析⭐
- Java Api生产者的重要参数⭐
- 环境准备
- 案例1:异步发送消息⭐
- MyProducerAsync类(生产者)⭐
- 案例2:带回调函数的异步发送消息⭐
- MyProducerAsyncCallback类(生产者)⭐
- 案例3:同步发送消息⭐
- MyProducerAsyncCallback类(生产者)⭐
- 生产者默认分区策略⭐
- 自定义分区器⭐
- MyPartitionerImpl类
- MyProducerPartitioner类
- 生产调优1:如何提高生产者的吞吐量?⭐
- 消息累加器(RecordAccumulator)
- 消息发送线程(Sender线程)
- 生产调优2:如何保证消息不丢失?(消息可靠性)⭐
- 消息确认机制-acks⭐
- ISR、OSR、AR之间的关系
- 生产调优3:如何保证消息不重复?(消息去重)⭐
- 消息通信的基本概念⭐
- 幂等性实现消息去重原理⭐
- 开启幂等性⭐
- 消息事务⭐
- 生产调优4:如何保证消息不会乱序(单分区下的消息顺序性)?⭐
高性能消息中间件 - Kafka3.x(二)
Kafka生产者⭐
生产者发生原理⭐
当我们通过producer(生产者)调用send方法发送消息,该消息就会依次经过拦截器(一般不使用)、序列化器(要对key、value进行序列化)、分区器(提供了默认的分区分配策略,也可以自定义分区器),通过分区器计算出该消息发往哪个分区,此时就会把消息先发送给RecordAccumulate缓存(而不是直接发生给broker的分区),到达RecordAccumulate缓存中,需要满足两个条件(满足两者之一即可发送)才会通过sender线程将数据发送给kafka broker(1:batch.size默认是16kb,如果批次消息到达16kb,则会发送;2:linger.ms默认是0ms,也就是说只要消息在缓存中待了0ms还没有发送则也会自动发生。),当上面两个消息发送条件满足其中一个之后,就会通过sender线程进行发送消息到kafka broker的分区,当收到某条发送成功ack之后就会把RecordAccumulate中的对应的消息给移除掉,如果没有收到发生成功ack的话,则会进行重试(重试次数默认是int的最大值。),不过默认kafka可以缓存5条未ack的消息。
RecordAccumulator源码简单分析⭐
public class RecordAccumulator {private final Logger log;private volatile boolean closed;private final AtomicInteger flushesInProgress;private final AtomicInteger appendsInProgress;private final int batchSize;private final CompressionType compression;private final int lingerMs;private final long retryBackoffMs;private final int deliveryTimeoutMs;private final BufferPool free;private final Time time;private final ApiVersions apiVersions;private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;private final IncompleteBatches incomplete;private final Set<TopicPartition> muted;private int drainIndex;private final TransactionManager transactionManager;private long nextBatchExpiryTimeMs = 9223372036854775807L;省略......}
- batchSize:批次大小(默认:16kb)
- compression:压缩类型(默认:不压缩)
- lingerMs:等待时间(默认是0ms)
- ConcurrentMap<TopicPartition, Deque< ProducerBatch >> batches:缓存(默认是32M)
- 由于TopicPartition(主题分区)是key,Deque< ProducerBatch >(双端队列)是值。
- 可以看出每一个topic的分区(0、1、2…)都会创建一个对应的双端队列用于存储消息,以便后面通过sender线程发送给kafka broker。
- transactionManager:事务管理器
Java Api生产者的重要参数⭐
参数 | 描述 |
---|---|
bootstrap.servers | 指定连接的Kafka Broker服务器的主机名称和端口号。(可以设置 1 个或者多个,中间用逗号隔开即可。) |
key.serializer和 value.serializer | 指定发送消息的 key和value 的序列化类。要写全类名。 |
buffer.memory | RecordAccumulator缓冲区的总大小,默认大小为32m(生产环境可以调大,比如调到64m) |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender线程等待linger.ms之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms之间。 |
acks | 0:生产者发送过来的数据,不需要应答ack。 1:生产者发送过来的数据,Leader收到数据后应答ack。 -1(all):生产者发送过来的数据,Leader 和 isr 队列 里面的所有节点都要应答ack。默认值是-1,-1 和 all是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有应答ack的次数,默认为 5,开启幂等性后(幂等性默认是开启的),要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4 和 zstd。(生产环境可以使用snappy或者gzip) |
环境准备
创建一个名为java-api-test的topic主题⭐
kafka-topics.sh --bootstrap-server=192.168.184.201:9092 --topic=java-api-test --partitions=5 --replication-factor=2 --create
命令行开启一个consumer消费者监听名为java-api-test的topic⭐
kafka-console-consumer.sh --bootstrap-server=192.168.184.201:9092 --topic=java-api-test
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>kafka-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies>
<!-- kafka3.2.1--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.1</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version></dependency><!-- fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.12</version></dependency></dependencies>
</project>
案例1:异步发送消息⭐
MyProducerAsync类(生产者)⭐
- 1:编写代码如下:
package com.kafka01.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @author youzhengjie 2022-09-12 20:21:18*/
public class MyProducerAsync {public static void main(String[] args) {Properties properties = new Properties();// 基本配置(都可以在ProducerConfig找到对应的配置)// 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 2:配置key和value的字符串序列化器// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer// StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);//4:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息//我们使用的是ProducerRecord(String topic, V value)构造方法。//topic:消息要发送到哪个topic名称//value:消息内容for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<String,String>("java-api-test","java api发送的消息:"+i));}//5:关闭KafkaProducerkafkaProducer.close();}}
- 2:运行MyProducerSync类的main方法发送kafka消息:
- 3:查看我们刚刚启动的consumer:
案例2:带回调函数的异步发送消息⭐
MyProducerAsyncCallback类(生产者)⭐
回调函数会在producer生产者收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(Record Metadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
package com.kafka01.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class MyProducerAsyncCallback {public static void main(String[] args) {Properties properties = new Properties();// 基本配置(都可以在ProducerConfig找到对应的配置)// 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 2:配置key和value的字符串序列化器// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer// StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);//4:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息//我们使用的是ProducerRecord(String topic, V value)构造方法。//topic:消息要发送到哪个topic名称//value:消息内容for (int i = 0; i < 10; i++) {ProducerRecord<String, String> producerRecord =new ProducerRecord<>("java-api-test", "MyProducerAsyncCallback发送的消息:" + i);//带回调函数的异步发送kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {//如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。if(e==null) {System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());}else {System.out.println("消息发送失败。");}}});}//5:关闭KafkaProducerkafkaProducer.close();}}
案例3:同步发送消息⭐
- 同步发送:意思是当我们一次性发送多条消息的时候,会按顺序一条一条来发送。
MyProducerAsyncCallback类(生产者)⭐
package com.kafka01.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class MyProducerSyncCallback {public static void main(String[] args) {Properties properties = new Properties();// 基本配置(都可以在ProducerConfig找到对应的配置)// 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 2:配置key和value的字符串序列化器// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer// StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);//4:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息//我们使用的是ProducerRecord(String topic, V value)构造方法。//topic:消息要发送到哪个topic名称//value:消息内容for (int i = 0; i < 10; i++) {ProducerRecord<String, String> producerRecord =new ProducerRecord<>("java-api-test", "MyProducerSyncCallback发送的消息:" + i);//带回调函数的同步发送try {kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {//如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。if(e==null) {System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());}else {System.out.println("消息发送失败。");}}}).get(); //只需要在send方法后面加上get()就是同步了。} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}//5:关闭KafkaProducerkafkaProducer.close();}}
生产者默认分区策略⭐
-
生产者默认分区策略:
-
1:指定partition分区的情况下,直接将指定的值作为partition值;
-
2:没有指明partition:
- 2.1:但是指定了key的情况下,将key的hashcode值与topic的partition数进行取模得到partition值;
- 2.2:既没有partition值又没有key值的情况下,kafka默认使用粘性分区器(StrickyPartition),会随机选择一个分区,并一直使用这个分区,直到这个分区满了才会随机选择下一个分区。
自定义分区器⭐
-
实现步骤:
-
1:新建一个自定义分区器类去实现 Partitioner 接口。
-
2:重写 partition()方法。
-
3:把写好的自定义分区器类配置到生产者配置上。
-
MyPartitionerImpl类
package com.kafka01.config;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*
自定义分区器类*/
public class MyPartitionerImpl implements Partitioner {//只需要重写这个方法@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//将value转换成String类型String val = value.toString();//如果val内容为part-3则分配到3号分区中,反之分配到2号分区if(val!=null&& "part-3".equals(val)){//分配到3号分区return 3;}//分配到2号分区return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
MyProducerPartitioner类
package com.kafka01.producer;import com.kafka01.config.MyPartitionerImpl;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class MyProducerPartitioner {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();// 基本配置(都可以在ProducerConfig找到对应的配置)// 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 2:配置key和value的字符串序列化器// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer// StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3:配置自定义分区器类。ProducerConfig.PARTITIONER_CLASS_CONFIG相当于partitioner.classproperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitionerImpl.class.getName());//4:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);//5:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息//我们使用的是ProducerRecord(String topic, V value)构造方法。//topic:消息要发送到哪个topic名称//value:消息内容for (int i = 0; i < 10; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("java-api-test", "part-" + i);kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {//如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。if(exception==null) {System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());}else {System.out.println("消息发送失败。");}}}).get();}//6:关闭KafkaProducerkafkaProducer.close();}}
生产调优1:如何提高生产者的吞吐量?⭐
-
方案如下:
- 1:batch.size:批次大小,默认是16k,可以适当调整,不是越大越好。
- 2:linger.ms:等待时间,默认是0ms,建议修改成5-100ms之间。
- 3:compression.type:压缩类型,默认是不压缩,建议修改成snappy或者gzip。
- 4:buffer.memory(RecordAccumulator):缓冲区大小,默认是32M,建议修改成64M。
-
调整上面四个参数后的代码:
package com.kafka02.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class MyProducer01 {public static void main(String[] args) {Properties properties = new Properties();// 1:kafka基本配置// 连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 配置key和value的字符串序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 2:配置生产者优化参数// batch.size:批次大小,默认 16K,还是保持16kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*16);// linger.ms:等待时间,默认 0,修改成10msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 10);// RecordAccumulator:缓冲区大小,默认 32M,修改成64Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,(1024*1024*64));// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("java-api-test", "消息---" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {//如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。if(exception==null) {System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());}else {System.out.println("消息发送失败。");}}});}kafkaProducer.close();}
}
消息累加器(RecordAccumulator)
- 消息累加器(RecordAccumulator)默认大小是32m,这个默认值在生产环境中很容器就能达到,就会导致消息堆满消息累加器,从而阻塞。
- 消息累加器的底层是一个Map集合(key为TopicPartition,value为Deque< ProducerBatch >),可以看出我们kafka每一个分区都会在消息累加器中有对应的一个双端队列。
- 为了防止消息太大,我们通常需要指定一个压缩类型(比如snappy或者gzip),这样也可以减少网络传输的带宽占用。
消息发送线程(Sender线程)
- Sender线程的作用就是把符合条件(满足batch.size或者linger.ms两者满足一个即可)的消息发送给kafka broker。
- Sender线程是基于Java nio的selector,通过selector发送消息。
- Sender线程默认可以容忍5个消息未被确认,当消息发送失败时会进行重试(重试次数默认为int的最大值),
生产调优2:如何保证消息不丢失?(消息可靠性)⭐
消息确认机制-acks⭐
-
消息确认机制acks一共有三种:
- 0:
- 表示生产者当消息发送出去就不管了,不管leader副本和follower副本是否接受到数据。(这种方式效率最高,但是消息可靠性最低,很容易丢失消息)
- 1:
- 表示生产者发送消息后,只有leader副本接受到了数据(确认了消息),消息才算发送成功。(这种方式效率中等,可靠性也是中等,有丢失消息的风险)
- 丢失消息过程:当生产者成功把消息发送给leader副本,但是此时follower宕机了,由于acks为1,所以这条消息被确认了不会重试消息了,但是过了一会儿,follower节点恢复了,但是leader副本的节点却宕机了,此时由于follower没有消息的备份,所以导致消息丢失。
- -1(或者all):
- 表示生产者发送消息后,需要等leader副本和follower副本都接收到消息并且确认消息后才算成功。(这种方式效率最低,但是可靠性最高,在副本>2的情况下基本没有丢失数据风险。)
- 0:
-
数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
ISR、OSR、AR之间的关系
-
AR=ISR+OSR
-
ISR:表示在指定时间内和leader保存数据同步的副本集合;
-
ORS:表示不能在指定的时间内和leader保持数据同步副本集合,称为OSR(Out-Sync Relipca set)。
生产调优3:如何保证消息不重复?(消息去重)⭐
消息通信的基本概念⭐
- 最少一次:ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量>=2。可以保证数据不丢失,但是不能保证数据不重复。
- 最多一次:ack设置为0,可以保证数据不重复,但是不能保证数据不丢失。
- 精确一次:最少一次+幂等性(或者最少一次+幂等性+事务)。
幂等性实现消息去重原理⭐
-
重复消息判断标准:当两条及以上的消息的三个条件(pid, Partition, SeqNumber)都相同时,Broker只会持久化一条,其中pid(ProducerId)是kafka每次重启都会分配一个新的,Partition分区号,SeqNumber序列化号(单调递增)。
-
broker中会在内存维护一个pid+分区对应的序列号。如果收到的序列号正好比内存序列号数字大1,才存储消息,如果小于内存序列号,意味着消息重复,那么会丢弃消息,并应答。如果远大于内存序列号,意味着消息丢失,会抛出异常。
-
注意:幂等性只能保证的是在单分区单会话内不重复!!(所以要引入事务)
开启幂等性⭐
- 只需要给生产者的enable.idempotence参数设置为true即可开启幂等性(默认就是true,也就是幂等性默认是开启的。)
消息事务⭐
-
首先必须注意:开启事务的前提是必须要开启幂等性!!!!!
-
为什么要引入事务?
-
由于幂等性不能跨分区运作,为了保证同时发的多条消息,要么全成功,要么全失败。kafka引入了事务的概念。
-
开启事务:需要producer设置
transactional.id
的值并同时开启幂等性。 -
事务的方法如下:(KafkaProducer对象调用)
// 1 初始化事务 void initTransactions(); // 2 开启事务 void beginTransaction() throws ProducerFencedException; // 3 在事务内提交已经消费的偏移量(主要用于消费者) void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException; // 4 提交事务 void commitTransaction() throws ProducerFencedException; // 5 放弃事务(类似于回滚事务的操作) void abortTransaction() throws ProducerFencedException;
生产调优4:如何保证消息不会乱序(单分区下的消息顺序性)?⭐
-
kafka只能保证单分区下的消息顺序性。
-
没有开启幂等性:
- 需要把 max.in.flight.requests.per.connection 设置为1。(缓冲队列最多放置1个请求)
-
开启幂等性:
- 需要 max.in.flight.requests.per.connection 设置为小于5。
-
这是因为broker端会缓存5条消息,能够保证最近5个消息是有序的。(当开启了幂等性,且消息小于5个则会进行重新排序)
这篇关于高性能消息中间件 - Kafka3.x(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!