Kafka如何将消息发送到指定分区

2024-05-03 06:36

本文主要是介绍Kafka如何将消息发送到指定分区,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

面试一个时,面试官问了一个问题,Kafka如何做到顺序消息。我回答只给Kafka的Topic创建一个分区,发送到该Topic的消息在Kafka中就是有序的。

面试官又问,如果Topic有多个分区呢?我回答消息发送者在发送消息的时候,指定分区进行发送,可以在发送消息时,每次指定相同的Key。但是面试官说这样做不到,我后面去查了资料,是可以做到的,我当时也没有反驳,毕竟我是一个求职者,跟面试官产生冲突也不太好。而且可能面试官也只知道其他的方式,不知道基于这种方式可以将消息发送到指定分区。

写个博客记录下。

有哪些方式可以将消息发送到指定分区?

当一个Topic中有多个分区的时候,如何将消息发送到指定分区呢?

方式一:基于key

下面的第二个参数,partitionA就是message的key。
Kafka会将具有相同的key的消息发送到同一分区,这是通过哈希函数实现的。
此外,Kafka会按照消息产生的顺序被一致性的接受,这就保证了同一分区内消息的顺序性。

kafkaProducer.send("order-topic", "partitionA", "critical data");
kafkaProducer.send("order-topic", "partitionA", "more critical data");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");

方式二:自定义分区器

Kafka允许自定义分区器,允许用户根据Topic、message key、message val、cluster等信息,自定义将消息发送到哪个分区。

自定义分区器:

public class CustomPartitioner implements Partitioner {// PREMIUM的意思是额外加价private static final int PREMIUM_PARTITION = 0;// NORMAL的意思是正常、标准private static final int NORMAL_PARTITION = 1;@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String customerType = extractCustomerType(key.toString());// 判断提取出的单词里面是否含有premium,如果有,则将其发送到第0号分区,否则发送到第1号分区。// 美团外卖有个加钱提前送达的服务,可以采用这种方式来实现。return "premium".equalsIgnoreCase(customerType) ? PREMIUM_PARTITION : NORMAL_PARTITION;}private String extractCustomerType(String key) {String[] parts = key.split("_");return parts.length > 1 ? parts[1] : "normal";}
}

在创建KafkaTemplate时,将自定义分区器设置到KafkaTemplate的属性里面去

// 在实际的SpringBoot项目中,可以将这个KafkaTemplate注入到Spring容器中
private KafkaTemplate<String, String> setProducerToUseCustomPartitioner() {Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);producerProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);return new KafkaTemplate<>(producerFactory);
}

测试代码。
将高级客户订单和普通客户订单区分开来,进行不同的处理。

// 在实际的SpringBoot项目中,可以从Spring容器中获取这个KafkaTemplate
KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();
// 根据自定义分区器,当key为123_premium,则消息会被发送到第0号分区。
kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
// 根据自定义分区器,当key为456_normal,不含有premium,则消息会被发送到第1号分区。
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

方式三:直接指定分区序号

第二个参数0、1就是指定的分区号码,发送消息时,直接指定分区,将消息发送到指定的分区。

kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");

其他方式

在下面的参考文章当中,还看到了一个粘性分区器,但是没看太懂,而且不为大家所熟知,所以就没有太关注。
将数据发送到 Kafka 中的特定分区

参考

将数据发送到 Kafka 中的特定分区

这篇关于Kafka如何将消息发送到指定分区的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/956054

相关文章

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

使用Python实现获取网页指定内容

《使用Python实现获取网页指定内容》在当今互联网时代,网页数据抓取是一项非常重要的技能,本文将带你从零开始学习如何使用Python获取网页中的指定内容,希望对大家有所帮助... 目录引言1. 网页抓取的基本概念2. python中的网页抓取库3. 安装必要的库4. 发送HTTP请求并获取网页内容5. 解

Python实现合并与拆分多个PDF文档中的指定页

《Python实现合并与拆分多个PDF文档中的指定页》这篇文章主要为大家详细介绍了如何使用Python实现将多个PDF文档中的指定页合并生成新的PDF以及拆分PDF,感兴趣的小伙伴可以参考一下... 安装所需要的库pip install PyPDF2 -i https://pypi.tuna.tsingh

Flask解决指定端口无法生效问题

《Flask解决指定端口无法生效问题》文章讲述了在使用PyCharm开发Flask应用时,启动地址与手动指定的IP端口不一致的问题,通过修改PyCharm的运行配置,将Flask项目的运行模式从Fla... 目录android问题重现解决方案问题重现手动指定的IP端口是app.run(host='0.0.

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D