【Kafka】Kafka Producer 分区-05

2024-06-15 16:52
文章标签 05 分区 kafka producer

本文主要是介绍【Kafka】Kafka Producer 分区-05,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【Kafka】Kafka Producer 分区-05

  • 1. 分区的好处
  • 2. 分区策略
    • 2.1 默认的分区器 DefaultPartitioner
  • 3. 自定义分区器

1. 分区的好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

在这里插入图片描述

see 2.4.1http://t.csdnimg.cn/m1O9u

2. 分区策略

2.1 默认的分区器 DefaultPartitioner

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}

策略如下:

  1. 如果记录中指定了分区,使用指定的分区。
    这意味着生产者在发送消息时明确指定了一个分区号,Kafka将直接使用这个分区。
  2. 如果没有指定分区但存在键,根据键的哈希值选择分区。
    如果消息中没有明确指定分区,但是提供了一个键,Kafka会使用键的哈希值来计算应该使用哪个分区。这可以保证具有相同键的消息被发送到相同的分区,从而保证消息的有序性。
  3. 如果既没有指定分区也没有键,选择一个“sticky partition”(粘性分区),在批次满时更换分区。
    如果消息既没有指定分区,也没有键,Kafka将使用“sticky partition”策略。这种策略会在消息批次满时更换分区,以便于提高效率和性能。

这个注释是对DefaultPartitioner类的说明,该类实现了Partitioner接口,用于定义消息分区的策略。DefaultPartitioner类的主要功能就是根据上述规则决定消息发送到哪个分区。以下是DefaultPartitioner类的示例实现结构:

public class DefaultPartitioner implements Partitioner {// 初始化方法@Overridepublic void configure(Map<String, ?> configs) {// 配置代码}// 计算分区的方法@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 分区计算逻辑}// 清理资源的方法@Overridepublic void close() {// 资源清理代码}
}

该实现中包括了三个主要的方法:

  • configure(Map<String, ?> configs):用于初始化和配置分区器。
  • partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster):用于根据给定的主题、键和值来计算应该使用哪个分区。
  • close():用于在分区器关闭时清理资源。

在这里插入图片描述

案例一: 将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102
:9092 ");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)kafkaProducer.send(new ProducerRecord<>("first",1, "", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}

案例二: 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0kafkaProducer.send(new ProducerRecord<>("first","a", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}

3. 自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器

  1. 需求
    例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。

  2. 实现步骤
    定义类实现 Partitioner 接口
    重写 partition()方法

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** 1. 实现接口 Partitioner* 2. 实现 3 个方法:partition,close,configure* 3. 编写 partition 方法,返回分区号*/
public class MyPartitioner implements Partitioner {/*** 返回信息对应的分区** @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[]keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")) {partition = 0;} else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}

使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");KafkaProducer < String, String > kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}

测试
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

这篇关于【Kafka】Kafka Producer 分区-05的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1064028

相关文章

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

忽略某些文件 —— Git 学习笔记 05

忽略某些文件 忽略某些文件 通过.gitignore文件其他规则源如何选择规则源参考资料 对于某些文件,我们不希望把它们纳入 Git 的管理,也不希望它们总出现在未跟踪文件列表。通常它们都是些自动生成的文件,比如日志文件、编译过程中创建的临时文件等。 通过.gitignore文件 假设我们要忽略 lib.a 文件,那我们可以在 lib.a 所在目录下创建一个名为 .gi