高性能消息中间件 - Kafka3.x(二)

2023-11-01 14:44

本文主要是介绍高性能消息中间件 - Kafka3.x(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 高性能消息中间件 - Kafka3.x(二)
      • Kafka生产者⭐
        • 生产者发生原理⭐
        • RecordAccumulator源码简单分析⭐
        • Java Api生产者的重要参数⭐
        • 环境准备
          • 创建一个名为java-api-test的topic主题⭐
          • 命令行开启一个consumer消费者监听名为java-api-test的topic⭐
          • pom.xml
        • 案例1:异步发送消息⭐
          • MyProducerAsync类(生产者)⭐
        • 案例2:带回调函数的异步发送消息⭐
          • MyProducerAsyncCallback类(生产者)⭐
        • 案例3:同步发送消息⭐
          • MyProducerAsyncCallback类(生产者)⭐
        • 生产者默认分区策略⭐
        • 自定义分区器⭐
          • MyPartitionerImpl类
          • MyProducerPartitioner类
        • 生产调优1:如何提高生产者的吞吐量?⭐
        • 消息累加器(RecordAccumulator)
        • 消息发送线程(Sender线程)
        • 生产调优2:如何保证消息不丢失?(消息可靠性)⭐
          • 消息确认机制-acks⭐
        • ISR、OSR、AR之间的关系
        • 生产调优3:如何保证消息不重复?(消息去重)⭐
          • 消息通信的基本概念⭐
          • 幂等性实现消息去重原理⭐
          • 开启幂等性⭐
          • 消息事务⭐
        • 生产调优4:如何保证消息不会乱序(单分区下的消息顺序性)?⭐

高性能消息中间件 - Kafka3.x(二)

Kafka生产者⭐

生产者发生原理⭐

当我们通过producer(生产者)调用send方法发送消息,该消息就会依次经过拦截器(一般不使用)、序列化器(要对key、value进行序列化)、分区器(提供了默认的分区分配策略,也可以自定义分区器),通过分区器计算出该消息发往哪个分区,此时就会把消息先发送给RecordAccumulate缓存(而不是直接发生给broker的分区),到达RecordAccumulate缓存中,需要满足两个条件(满足两者之一即可发送)才会通过sender线程将数据发送给kafka broker(1:batch.size默认是16kb,如果批次消息到达16kb,则会发送;2:linger.ms默认是0ms,也就是说只要消息在缓存中待了0ms还没有发送则也会自动发生。),当上面两个消息发送条件满足其中一个之后,就会通过sender线程进行发送消息到kafka broker的分区,当收到某条发送成功ack之后就会把RecordAccumulate中的对应的消息给移除掉,如果没有收到发生成功ack的话,则会进行重试(重试次数默认是int的最大值。),不过默认kafka可以缓存5条未ack的消息。

在这里插入图片描述

RecordAccumulator源码简单分析⭐
public class RecordAccumulator {private final Logger log;private volatile boolean closed;private final AtomicInteger flushesInProgress;private final AtomicInteger appendsInProgress;private final int batchSize;private final CompressionType compression;private final int lingerMs;private final long retryBackoffMs;private final int deliveryTimeoutMs;private final BufferPool free;private final Time time;private final ApiVersions apiVersions;private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;private final IncompleteBatches incomplete;private final Set<TopicPartition> muted;private int drainIndex;private final TransactionManager transactionManager;private long nextBatchExpiryTimeMs = 9223372036854775807L;省略......}
  • batchSize:批次大小(默认:16kb)
  • compression:压缩类型(默认:不压缩)
  • lingerMs:等待时间(默认是0ms)
  • ConcurrentMap<TopicPartition, Deque< ProducerBatch >> batches:缓存(默认是32M)
    • 由于TopicPartition(主题分区)是key,Deque< ProducerBatch >(双端队列)是值。
    • 可以看出每一个topic的分区(0、1、2…)都会创建一个对应的双端队列用于存储消息,以便后面通过sender线程发送给kafka broker。
  • transactionManager:事务管理器
Java Api生产者的重要参数⭐
参数描述
bootstrap.servers指定连接的Kafka Broker服务器的主机名称和端口号。(可以设置 1 个或者多个,中间用逗号隔开即可。
key.serializervalue.serializer指定发送消息的 key和value 的序列化类。要写全类名。
buffer.memoryRecordAccumulator缓冲区的总大小,默认大小为32m(生产环境可以调大,比如调到64m)
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,sender线程等待linger.ms之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms之间。
acks0:生产者发送过来的数据,不需要应答ack。
1:生产者发送过来的数据,Leader收到数据后应答ack。
-1(all):生产者发送过来的数据,Leader 和 isr 队列
里面的所有节点都要应答ack。默认值是-1,-1 和
all是等价的。
max.in.flight.requests.per.connection允许最多没有应答ack的次数,默认为 5,开启幂等性后(幂等性默认是开启的),要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
enable.idempotence是否开启幂等性,默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd。(生产环境可以使用snappy或者gzip
环境准备
创建一个名为java-api-test的topic主题⭐
kafka-topics.sh --bootstrap-server=192.168.184.201:9092 --topic=java-api-test --partitions=5 --replication-factor=2  --create
命令行开启一个consumer消费者监听名为java-api-test的topic⭐
kafka-console-consumer.sh --bootstrap-server=192.168.184.201:9092 --topic=java-api-test
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>kafka-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies>
<!--        kafka3.2.1--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.1</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version></dependency><!-- fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.12</version></dependency></dependencies>
</project>
案例1:异步发送消息⭐
MyProducerAsync类(生产者)⭐
  • 1:编写代码如下:
package com.kafka01.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @author youzhengjie 2022-09-12 20:21:18*/
public class MyProducerAsync {public static void main(String[] args) {Properties properties = new Properties();// 基本配置(都可以在ProducerConfig找到对应的配置)// 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 2:配置key和value的字符串序列化器// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer// StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);//4:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息//我们使用的是ProducerRecord(String topic, V value)构造方法。//topic:消息要发送到哪个topic名称//value:消息内容for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<String,String>("java-api-test","java api发送的消息:"+i));}//5:关闭KafkaProducerkafkaProducer.close();}}
  • 2:运行MyProducerSync类的main方法发送kafka消息:
  • 3:查看我们刚刚启动的consumer:

在这里插入图片描述

案例2:带回调函数的异步发送消息⭐
MyProducerAsyncCallback类(生产者)⭐

回调函数会在producer生产者收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(Record Metadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

package com.kafka01.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class MyProducerAsyncCallback {public static void main(String[] args) {Properties properties = new Properties();// 基本配置(都可以在ProducerConfig找到对应的配置)// 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 2:配置key和value的字符串序列化器// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer// StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);//4:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息//我们使用的是ProducerRecord(String topic, V value)构造方法。//topic:消息要发送到哪个topic名称//value:消息内容for (int i = 0; i < 10; i++) {ProducerRecord<String, String> producerRecord =new ProducerRecord<>("java-api-test", "MyProducerAsyncCallback发送的消息:" + i);//带回调函数的异步发送kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {//如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。if(e==null) {System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());}else {System.out.println("消息发送失败。");}}});}//5:关闭KafkaProducerkafkaProducer.close();}}
案例3:同步发送消息⭐
  • 同步发送:意思是当我们一次性发送多条消息的时候,会按顺序一条一条来发送。
MyProducerAsyncCallback类(生产者)⭐
package com.kafka01.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class MyProducerSyncCallback {public static void main(String[] args) {Properties properties = new Properties();// 基本配置(都可以在ProducerConfig找到对应的配置)// 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 2:配置key和value的字符串序列化器// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer// StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);//4:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息//我们使用的是ProducerRecord(String topic, V value)构造方法。//topic:消息要发送到哪个topic名称//value:消息内容for (int i = 0; i < 10; i++) {ProducerRecord<String, String> producerRecord =new ProducerRecord<>("java-api-test", "MyProducerSyncCallback发送的消息:" + i);//带回调函数的同步发送try {kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {//如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。if(e==null) {System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());}else {System.out.println("消息发送失败。");}}}).get(); //只需要在send方法后面加上get()就是同步了。} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}//5:关闭KafkaProducerkafkaProducer.close();}}
生产者默认分区策略⭐
  • 生产者默认分区策略:

  • 1:指定partition分区的情况下,直接将指定的值作为partition值;

  • 2:没有指明partition:

    • 2.1:但是指定了key的情况下,将key的hashcode值与topic的partition数进行取模得到partition值;
    • 2.2:既没有partition值又没有key值的情况下,kafka默认使用粘性分区器(StrickyPartition),会随机选择一个分区,并一直使用这个分区,直到这个分区满了才会随机选择下一个分区。
自定义分区器⭐
  • 实现步骤:

    • 1:新建一个自定义分区器类去实现 Partitioner 接口。

    • 2:重写 partition()方法。

    • 3:把写好的自定义分区器类配置到生产者配置上。

MyPartitionerImpl类
package com.kafka01.config;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*
自定义分区器类*/
public class MyPartitionerImpl implements Partitioner {//只需要重写这个方法@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//将value转换成String类型String val = value.toString();//如果val内容为part-3则分配到3号分区中,反之分配到2号分区if(val!=null&& "part-3".equals(val)){//分配到3号分区return 3;}//分配到2号分区return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
MyProducerPartitioner类
package com.kafka01.producer;import com.kafka01.config.MyPartitionerImpl;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class MyProducerPartitioner {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();// 基本配置(都可以在ProducerConfig找到对应的配置)// 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 2:配置key和value的字符串序列化器// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer// StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3:配置自定义分区器类。ProducerConfig.PARTITIONER_CLASS_CONFIG相当于partitioner.classproperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitionerImpl.class.getName());//4:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);//5:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息//我们使用的是ProducerRecord(String topic, V value)构造方法。//topic:消息要发送到哪个topic名称//value:消息内容for (int i = 0; i < 10; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("java-api-test", "part-" + i);kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {//如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。if(exception==null) {System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());}else {System.out.println("消息发送失败。");}}}).get();}//6:关闭KafkaProducerkafkaProducer.close();}}
生产调优1:如何提高生产者的吞吐量?⭐
  • 方案如下:

    • 1:batch.size:批次大小,默认是16k,可以适当调整,不是越大越好
    • 2:linger.ms:等待时间,默认是0ms,建议修改成5-100ms之间
    • 3:compression.type:压缩类型,默认是不压缩,建议修改成snappy或者gzip
    • 4:buffer.memory(RecordAccumulator):缓冲区大小,默认是32M,建议修改成64M
  • 调整上面四个参数后的代码:

package com.kafka02.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class MyProducer01 {public static void main(String[] args) {Properties properties = new Properties();// 1:kafka基本配置// 连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");// 配置key和value的字符串序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 2:配置生产者优化参数// batch.size:批次大小,默认 16K,还是保持16kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*16);// linger.ms:等待时间,默认 0,修改成10msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 10);// RecordAccumulator:缓冲区大小,默认 32M,修改成64Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,(1024*1024*64));// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("java-api-test", "消息---" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {//如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。if(exception==null) {System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());}else {System.out.println("消息发送失败。");}}});}kafkaProducer.close();}
}
消息累加器(RecordAccumulator)
  • 消息累加器(RecordAccumulator)默认大小是32m,这个默认值在生产环境中很容器就能达到,就会导致消息堆满消息累加器,从而阻塞。
  • 消息累加器的底层是一个Map集合(key为TopicPartition,value为Deque< ProducerBatch >),可以看出我们kafka每一个分区都会在消息累加器中有对应的一个双端队列。
  • 为了防止消息太大,我们通常需要指定一个压缩类型(比如snappy或者gzip),这样也可以减少网络传输的带宽占用。
消息发送线程(Sender线程)
  • Sender线程的作用就是把符合条件(满足batch.size或者linger.ms两者满足一个即可)的消息发送给kafka broker。
  • Sender线程是基于Java nio的selector,通过selector发送消息。
  • Sender线程默认可以容忍5个消息未被确认,当消息发送失败时会进行重试(重试次数默认为int的最大值),
生产调优2:如何保证消息不丢失?(消息可靠性)⭐
消息确认机制-acks⭐
  • 消息确认机制acks一共有三种:

    • 0
      • 表示生产者当消息发送出去就不管了,不管leader副本和follower副本是否接受到数据。(这种方式效率最高,但是消息可靠性最低,很容易丢失消息)
    • 1
      • 表示生产者发送消息后,只有leader副本接受到了数据(确认了消息),消息才算发送成功。(这种方式效率中等,可靠性也是中等,有丢失消息的风险)
      • 丢失消息过程:当生产者成功把消息发送给leader副本,但是此时follower宕机了,由于acks为1,所以这条消息被确认了不会重试消息了,但是过了一会儿,follower节点恢复了,但是leader副本的节点却宕机了,此时由于follower没有消息的备份,所以导致消息丢失。
    • -1(或者all)
      • 表示生产者发送消息后,需要等leader副本和follower副本都接收到消息并且确认消息后才算成功。(这种方式效率最低,但是可靠性最高,在副本>2的情况下基本没有丢失数据风险。)
  • 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

ISR、OSR、AR之间的关系
  • AR=ISR+OSR

  • ISR:表示在指定时间内和leader保存数据同步的副本集合;

  • ORS:表示不能在指定的时间内和leader保持数据同步副本集合,称为OSR(Out-Sync Relipca set)。

生产调优3:如何保证消息不重复?(消息去重)⭐
消息通信的基本概念⭐
  • 最少一次ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量>=2。可以保证数据不丢失,但是不能保证数据不重复。
  • 最多一次:ack设置为0,可以保证数据不重复,但是不能保证数据不丢失。
  • 精确一次最少一次+幂等性(或者最少一次+幂等性+事务)。
幂等性实现消息去重原理⭐
  • 重复消息判断标准:当两条及以上的消息的三个条件(pid, Partition, SeqNumber)都相同时,Broker只会持久化一条,其中pid(ProducerId)是kafka每次重启都会分配一个新的,Partition分区号,SeqNumber序列化号(单调递增)。

  • broker中会在内存维护一个pid+分区对应的序列号。如果收到的序列号正好比内存序列号数字大1,才存储消息如果小于内存序列号,意味着消息重复,那么会丢弃消息,并应答如果远大于内存序列号,意味着消息丢失,会抛出异常

  • 注意:幂等性只能保证的是在单分区单会话内不重复!!(所以要引入事务)

开启幂等性⭐
  • 只需要给生产者的enable.idempotence参数设置为true即可开启幂等性(默认就是true,也就是幂等性默认是开启的。)
消息事务⭐
  • 首先必须注意:开启事务的前提是必须要开启幂等性!!!!!

  • 为什么要引入事务?

  • 由于幂等性不能跨分区运作,为了保证同时发的多条消息,要么全成功,要么全失败。kafka引入了事务的概念。

  • 开启事务:需要producer设置transactional.id的值并同时开启幂等性。

  • 事务的方法如下:(KafkaProducer对象调用)

    // 1 初始化事务
    void initTransactions();
    // 2 开启事务
    void beginTransaction() throws ProducerFencedException;
    // 3 在事务内提交已经消费的偏移量(主要用于消费者)
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws 
    ProducerFencedException;
    // 4 提交事务
    void commitTransaction() throws ProducerFencedException;
    // 5 放弃事务(类似于回滚事务的操作)
    void abortTransaction() throws ProducerFencedException;
    
生产调优4:如何保证消息不会乱序(单分区下的消息顺序性)?⭐
  • kafka只能保证单分区下的消息顺序性。

  • 没有开启幂等性:

    • 需要把 max.in.flight.requests.per.connection 设置为1。(缓冲队列最多放置1个请求)
  • 开启幂等性:

    • 需要 max.in.flight.requests.per.connection 设置为小于5。
  • 这是因为broker端会缓存5条消息,能够保证最近5个消息是有序的。(当开启了幂等性,且消息小于5个则会进行重新排序

这篇关于高性能消息中间件 - Kafka3.x(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/323723

相关文章

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

构建高性能WEB之HTTP首部优化

0x00 前言 在讨论浏览器优化之前,首先我们先分析下从客户端发起一个HTTP请求到用户接收到响应之间,都发生了什么?知己知彼,才能百战不殆。这也是作为一个WEB开发者,为什么一定要深入学习TCP/IP等网络知识。 0x01 到底发生什么了? 当用户发起一个HTTP请求时,首先客户端将与服务端之间建立TCP连接,成功建立连接后,服务端将对请求进行处理,并对客户端做出响应,响应内容一般包括响应

Nginx高性能分析

Nginx 是一个免费的,开源的,高性能的 HTTP 服务器和反向代理,以及 IMAP / POP3 代理服务器。Nginx 以其高性能,稳定性,丰富的功能,简单的配置和低资源消耗而闻名。本文从底层原理分析 Nginx 为什么这么快! Nginx 的进程模型 Nginx 服务器,正常运行过程中: 多进程:一个 Master 进程、多个 Worker 进程。Master 进程:管理 Work

云原生之高性能web服务器学习(持续更新中)

高性能web服务器 1 Web服务器的基础介绍1.1 Web服务介绍1.1.1 Apache介绍1.1.2 Nginx-高性能的 Web 服务端 2 Nginx架构与安装2.1 Nginx概述2.1.1 Nginx 功能介绍2.1.2 基础特性2.1.3 Web 服务相关的功能 2.2 Nginx 架构和进程2.2.1 架构2.2.2 Ngnix进程结构 2.3 Nginx 模块介绍2.4

在亚马逊云科技上利用Graviton4代芯片构建高性能Java应用(上篇)

简介 在AI迅猛发展的时代,芯片算力对于模型性能起到了至关重要的作用。一款能够同时兼具高性能和低成本的芯片,能够帮助开发者快速构建性能稳定的生成式AI应用,同时降低开发成本。今天小李哥将介绍亚马逊推出的4代高性能计算处理器Gravition,带大家了解如何利用Graviton芯片为Java生成式AI应用提高性能、优化成本。 本篇文章将介绍如何在云平台上创建Graviton芯片服务器,并在Gra

驾驭冰雪 安全无忧,韩泰高性能冬季轮胎新品上市

- 韩泰轮胎推出冬季轮胎新产品Winter i*cept iZ3和SUV专用的Winter i*cept iZ3 X - 新轮胎采用了V型花纹,冰雪路面安全性极佳,而且具有操控性好、续航里程长的优点 - 新轮胎在位于北极圈以北300km的韩泰轮胎芬兰伊瓦洛测试场进行了严苛测试,确保极寒条件的安全性 2024年8月,韩泰轮胎正式在中国市场推出新一代高性能冬季轮胎Winter i*cept

高性能计算应用优化之代码实现调优(一)

本章将介绍代码实现过程中使用到的调优方法。在软件开发早期,开发者更多关注代码功能的实现,对代码的性能关注较少,随着代码规模增加,不合理的代码实现方法所带来的性能包袱逐渐凸显。因此,需要对原有代码实现进行优化,如修改不合理的访存顺序,使代码更易于被编译器优化等。 浮点数运算 浮点数运算是科学计算中开销最大的部分之一,特别是双精度除法,合理地设计实现浮点数运算环节可以显著提高程序的性能。 由于单

Minio 高性能分布式对象存储快速入手指南

0x00 Minio 快速入门 什么是对象存储? 描述: 对象存储(Object Storage)是一种存储数据的计算机体系结构,它以对象的形式存储和管理数据。与传统的文件系统和块存储不同,对象存储将数据作为对象存储在分布式的存储集群中,每个对象都有一个唯一的标识符(通常是一个URL),并且可以通过这个标识符来访问和检索数据。 「对象存储特点」: 弹性扩展:对象存储可以轻松地扩展存储容量,

MySQL创建高性能的索引(三)

MySQL创建高性能的索引 MySQL索引基础B-Tree索引适用的查询类型B-Tree注意事项 索引分类和区别B-Tree索引Hash索引空间索引全文索引 索引注意事项聚簇索引索引覆盖 MySQL 索引是存储引擎快速找到记录的一种数据结构。使用索引可以极大的提高MYSQL的查询性能,接下来讲介绍索引的基础、索引的分类和区别、索引的注意事项、以及关于聚簇索引和覆盖索引的知识点。

【项目二】C++高性能服务器开发——日志系统(日志器,日志级别,日志事件)

知识点备忘录 其实也没啥 操作记录 在乌邦图上写的,先是模仿sylar创建了目录 进入sylar文件夹,有今天写的log.h头文件 其中log_test.cpp是为了测试log.h能否正常运行建的,只是个测试文件 log.h写了三个类,日志级别,日志事件,日志器 log.h头文件如下: #ifndef __SYLAR_LOG_H_#define __SYLAR_LOG_H_#incl