【Kafka】Kafka提高生产者吞吐量、数据可靠性-06

2024-06-16 14:12

本文主要是介绍【Kafka】Kafka提高生产者吞吐量、数据可靠性-06,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【Kafka】Kafka提高生产者吞吐量-06

  • 1. 提高生产者吞吐量
  • 2.数据可靠性
    • 2.1 回顾数据的发送流程
    • 2.2 ack应答级别
      • 2.2.1 acks:0
      • 2.2.2 acks:1
      • 2.2.2 acks:-1(all)
        • 2.2.2.1 数据可靠性分析
        • 2.2.2.2 数据完全可靠
    • 2.3 可靠性总结
    • 2.4 可靠性代码配置

1. 提高生产者吞吐量

在这里插入图片描述

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first", "atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

2.数据可靠性

2.1 回顾数据的发送流程

在这里插入图片描述

2.2 ack应答级别

在这里插入图片描述

2.2.1 acks:0

在这里插入图片描述
当应答级别为0时,生产者发送过来的数据,不需要等数据落盘应答。
工作:生产者将数据发送到leader中,如果leader突然挂掉了,leader还没有与follower同步,那么整个数据就全部都丢了。

2.2.2 acks:1

在这里插入图片描述
当应答级别为1时,生产者发送过来的数据,Leader收到数据后应答。

工作:如果应答完毕之后,leader还未与follower同步,leader挂了,新的leader会产生,原来的一条数据不会再次发送,造成了数据的丢失。

2.2.2 acks:-1(all)

在这里插入图片描述
当应答级别为-1时,生产者发送过来的数据,Leader+和isr队列
里面的所有节点收齐数据后应答。-1和all等价。

工作:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader同步,那这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set (ISR),意为和Leader保持问步的Folower + Leader集合(leader:0, isr:0,1,2)

如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。

这样就不用等长期联系不上或者己经故障的节点。

2.2.2.1 数据可靠性分析

如果分区副本设置为1个,或者ISR里应答的最小副本数量(min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)


2.2.2.2 数据完全可靠

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

2.3 可靠性总结

可靠性总结:

  1. acks=0,生产者发送过来数据就不管了,可靠性差,效率高

  2. acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等

  3. acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低

  4. 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

2.4 可靠性代码配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerAck {public static void main(String[] args) throwsInterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

这篇关于【Kafka】Kafka提高生产者吞吐量、数据可靠性-06的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1066656

相关文章

大数据spark3.5安装部署之local模式详解

《大数据spark3.5安装部署之local模式详解》本文介绍了如何在本地模式下安装和配置Spark,并展示了如何使用SparkShell进行基本的数据处理操作,同时,还介绍了如何通过Spark-su... 目录下载上传解压配置jdk解压配置环境变量启动查看交互操作命令行提交应用spark,一个数据处理框架

通过ibd文件恢复MySql数据的操作方法

《通过ibd文件恢复MySql数据的操作方法》文章介绍通过.ibd文件恢复MySQL数据的过程,包括知道表结构和不知道表结构两种情况,对于知道表结构的情况,可以直接将.ibd文件复制到新的数据库目录并... 目录第一种情况:知道表结构第二种情况:不知道表结构总结今天干了一件大事,安装1Panel导致原来服务

Jmeter如何向数据库批量插入数据

《Jmeter如何向数据库批量插入数据》:本文主要介绍Jmeter如何向数据库批量插入数据方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Jmeter向数据库批量插入数据Jmeter向mysql数据库中插入数据的入门操作接下来做一下各个元件的配置总结Jmete

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

MySQL InnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据

《MySQLInnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据》mysql的ibdata文件被误删、被恶意修改,没有从库和备份数据的情况下的数据恢复,不能保证数据库所有表数据... 参考:mysql Innodb表空间卸载、迁移、装载的使用方法注意!此方法只适用于innodb_fi

mysql通过frm和ibd文件恢复表_mysql5.7根据.frm和.ibd文件恢复表结构和数据

《mysql通过frm和ibd文件恢复表_mysql5.7根据.frm和.ibd文件恢复表结构和数据》文章主要介绍了如何从.frm和.ibd文件恢复MySQLInnoDB表结构和数据,需要的朋友可以参... 目录一、恢复表结构二、恢复表数据补充方法一、恢复表结构(从 .frm 文件)方法 1:使用 mysq

mysql8.0无备份通过idb文件恢复数据的方法、idb文件修复和tablespace id不一致处理

《mysql8.0无备份通过idb文件恢复数据的方法、idb文件修复和tablespaceid不一致处理》文章描述了公司服务器断电后数据库故障的过程,作者通过查看错误日志、重新初始化数据目录、恢复备... 周末突然接到一位一年多没联系的妹妹打来电话,“刘哥,快来救救我”,我脑海瞬间冒出妙瓦底,电信火苲马扁.

golang获取prometheus数据(prometheus/client_golang包)

《golang获取prometheus数据(prometheus/client_golang包)》本文主要介绍了使用Go语言的prometheus/client_golang包来获取Prometheu... 目录1. 创建链接1.1 语法1.2 完整示例2. 简单查询2.1 语法2.2 完整示例3. 范围值

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

javaScript在表单提交时获取表单数据的示例代码

《javaScript在表单提交时获取表单数据的示例代码》本文介绍了五种在JavaScript中获取表单数据的方法:使用FormData对象、手动提取表单数据、使用querySelector获取单个字... 方法 1:使用 FormData 对象FormData 是一个方便的内置对象,用于获取表单中的键值