kafka: 基础概念回顾(生产者客户端和机架感知相关内容)

本文主要是介绍kafka: 基础概念回顾(生产者客户端和机架感知相关内容),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、kafka生产者客户端

1、整体架构:数据发送流程

在这里插入图片描述
(1)生产者

  • 拦截器
    生产者的拦截器可以在消息发送前做一些拦截工作对数据进行相应的处理,比如:消息过滤、消息内容修改等。
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable {//在将消息序列化和计算分区之前会调⽤该⽅法,⽤来对消息进⾏相应的定制化操作,如修改消息内容public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);//在消息被应答之前或者消息发送失败时调⽤该⽅法,优先于⽤⼾设定的Callback之前执⾏,如统计消息发送成功或失败的次数public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();
}
  • 序列化器
  • 分区器

二、kafka数据可靠性保证

1、LEO和HW
2、工作流程
3、Leader Epoch

三、粘性分区策略

四、机架感知

1、概念
2、机架感知分区分配策略
3、验证

(1)验证目标

  • 机架感知特性将同⼀分区的副本分散到不同的机架上
  • rack机制消费者可以消费到follower副本中的数据

(2)参数配置
broker端配置:

  • 配置名:broker.rack=my-rack-id
    • 解释:broker属于的rack
  • 配置名:replica.selector.class
    • 解释:ReplicaSelector实现类的全名,包括路径 (⽐如 RackAwareReplicaSelector 即按 rack id 指定消费)

Client端配置:
client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解释:这个参数需要和broker端指定的 broker.rack 相同,表⽰去哪个rack中获取数据。
  • 默认:null

(3)环境准备:kafka集群

  • kafka实例数: 4
  • 两个kafka实例broker.rack配置为0,另外两个kafka实例broker.rack配置为了2,broker端配置如下:
server1:
broker.id=0broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver2:
broker.id=1
broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver3
broker.id=2
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver4
broker.id=3
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

启动kafka集群,服务端⽇志信息:
在这里插入图片描述
在这里插入图片描述
验证一:机架感知特性将同一分区的副本分散到不同的机架上
在这里插入图片描述
创建topic rack02,副本被分配到了broker1和2
在这里插入图片描述
创建topic rack03 副本被分配到了0和3
在这里插入图片描述
在这里插入图片描述

验证二:客⼾端(消费者)验证:rack机制消费者可以消费到follower副本中的数据

验证代码如下:

package person.xsc.train.producer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import person.xsc.train.client.KafkaConsumerClient;
import person.xsc.train.constant.KafkaConstant;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Demo {public static KafkaConsumer<String, String> kafkaConsumer;public static void main(String[] args) {Properties properties = new Properties();properties.put(KafkaConstant.BOOTSTRAP_SERVERS, "localhost:9093,localhosproperties.put(KafkaConstant.GROUP_ID, "test01");properties.put(KafkaConstant.ENABLE_AUTO_COMMIT, "true");properties.put(KafkaConstant.AUTO_COMMIT_INTERVAL_MS, "1000");properties.put(KafkaConstant.KEY_DESERIALIZER, StringDeserializer.class.properties.put(KafkaConstant.VALUE_DESERIALIZER, StringDeserializer.clasproperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");properties.put(ConsumerConfig.CLIENT_RACK_CONFIG, "0");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");kafkaConsumer = KafkaConsumerClient.createKafkaClient(properties);receiveMessage("rack02");}public static void receiveMessage(String topic) {TopicPartition topicPartition0 = new TopicPartition(topic, 0);kafkaConsumer.assign(Arrays.asList(topicPartition0));while(true) {// Kafka的消费者⼀次拉取⼀批的数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll//System.out.println("开始打印消息!");// 5.将将记录(record)的offset、key、value都打印出来for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {// 主题String topicName = consumerRecord.topic();int partition = consumerRecord.partition();// offset:这条消息处于Kafka分区中的哪个位置long offset = consumerRecord.offset();// key\valueString key = consumerRecord.key();String value = consumerRecord.value();System.out.println(String.format("topic: %s, partition: %s, offs}}}
}

前置背景:
Topic rack02的partition 0分区的副本为broker2(对应的rack为2)和broker1(对应的rack为0),其中broker2为leader(在⾮rack机制下仅能消费到leader中的数据)。

在上述代码中,消费者配置中限制了rack为0,消费的分区为0,因此映射到broker1。通过测试可验证在rack机制下消费者可以消费到folloer副本中的数据,测试如下:
在这里插入图片描述

五、机架感知存在的问题

这篇关于kafka: 基础概念回顾(生产者客户端和机架感知相关内容)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/589875

相关文章

从基础到高级详解Python数值格式化输出的完全指南

《从基础到高级详解Python数值格式化输出的完全指南》在数据分析、金融计算和科学报告领域,数值格式化是提升可读性和专业性的关键技术,本文将深入解析Python中数值格式化输出的相关方法,感兴趣的小伙... 目录引言:数值格式化的核心价值一、基础格式化方法1.1 三种核心格式化方式对比1.2 基础格式化示例

redis-sentinel基础概念及部署流程

《redis-sentinel基础概念及部署流程》RedisSentinel是Redis的高可用解决方案,通过监控主从节点、自动故障转移、通知机制及配置提供,实现集群故障恢复与服务持续可用,核心组件包... 目录一. 引言二. 核心功能三. 核心组件四. 故障转移流程五. 服务部署六. sentinel部署

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

从基础到进阶详解Python条件判断的实用指南

《从基础到进阶详解Python条件判断的实用指南》本文将通过15个实战案例,带你大家掌握条件判断的核心技巧,并从基础语法到高级应用一网打尽,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录​引言:条件判断为何如此重要一、基础语法:三行代码构建决策系统二、多条件分支:elif的魔法三、

Python WebSockets 库从基础到实战使用举例

《PythonWebSockets库从基础到实战使用举例》WebSocket是一种全双工、持久化的网络通信协议,适用于需要低延迟的应用,如实时聊天、股票行情推送、在线协作、多人游戏等,本文给大家介... 目录1. 引言2. 为什么使用 WebSocket?3. 安装 WebSockets 库4. 使用 We

Java使用正则提取字符串中的内容的详细步骤

《Java使用正则提取字符串中的内容的详细步骤》:本文主要介绍Java中使用正则表达式提取字符串内容的方法,通过Pattern和Matcher类实现,涵盖编译正则、查找匹配、分组捕获、数字与邮箱提... 目录1. 基础流程2. 关键方法说明3. 常见场景示例场景1:提取所有数字场景2:提取邮箱地址4. 高级

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

C#高效实现Word文档内容查找与替换的6种方法

《C#高效实现Word文档内容查找与替换的6种方法》在日常文档处理工作中,尤其是面对大型Word文档时,手动查找、替换文本往往既耗时又容易出错,本文整理了C#查找与替换Word内容的6种方法,大家可以... 目录环境准备方法一:查找文本并替换为新文本方法二:使用正则表达式查找并替换文本方法三:将文本替换为图

一文带你迅速搞懂路由器/交换机/光猫三者概念区别

《一文带你迅速搞懂路由器/交换机/光猫三者概念区别》讨论网络设备时,常提及路由器、交换机及光猫等词汇,日常生活、工作中,这些设备至关重要,居家上网、企业内部沟通乃至互联网冲浪皆无法脱离其影响力,本文将... 当谈论网络设备时,我们常常会听到路由器、交换机和光猫这几个名词。它们是构建现代网络基础设施的关键组成

从基础到高阶详解Python多态实战应用指南

《从基础到高阶详解Python多态实战应用指南》这篇文章主要从基础到高阶为大家详细介绍Python中多态的相关应用与技巧,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、多态的本质:python的“鸭子类型”哲学二、多态的三大实战场景场景1:数据处理管道——统一处理不同数据格式