Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布

本文主要是介绍Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使Flink输出的数据在多个partition中均匀分布

FlinkKafkaProducerBase的子类可以使用默认的KafkaPartitioner FixedPartitioner(只向partition 0中写数据)也可以使用自己定义的Partitioner(继承KafkaPartitioner),我觉得实现比较复杂.

构造FlinkKafkaProducerBase的子类的2种情况

    public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, 
Properties producerConfig) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());}public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, 
Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);}

默认的FixedPartitioner

public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {private static final long serialVersionUID = 1627268846962918126L;private int targetPartition = -1;@Overridepublic void open(int parallelInstanceId, int parallelInstances, int[] partitions) {if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {throw new IllegalArgumentException();}this.targetPartition = partitions[parallelInstanceId % partitions.length];}@Overridepublic int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {if (targetPartition >= 0) {return targetPartition;} else {throw new RuntimeException("The partitioner has not been initialized properly");}}
}

在构造FlinkKafkaProducerBase的子类时,可以传递一个值为null的KafkaPartitioner,这样就可以使用Kafka Client默认的Partitioner,默认的Paritioner就是将数据均匀分配到各个partition中.

protected FlinkKafkaProducerBase<Record> createSink(String topic, KeyedSerializationSchemadeserializationSchema, Properties properties) {String classFullName = "";if (kafkaVersion.startsWith("0.8")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08";} else if (kafkaVersion.startsWith("0.9")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else if (kafkaVersion.startsWith("0.10")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else {throw new RuntimeException("not support the "+"version kafka = " + kafkaVersion);}FlinkKafkaProducerBase<Record> sink = null;try {Class clazz = Class.forName(classFullName);Constructor constructor = clazz.getConstructor(String.class, 
KeyedSerializationSchema.class, Properties.class, KafkaPartitioner.class);sink = (FlinkKafkaProducerBase) constructor.newInstance(topic, 
deserializationSchema, properties,(KafkaPartitioner)null);} catch (Throwable e) {e.printStackTrace();}return sink;}

Kafka Client中默认的Partitioner

public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}public void close() {}
}

调用过程

在调用FlinkKafkaProducerBase中的invoke方法时,会判断partitioner是否为空,如果为空则构建一个partition属性为空的ProducerRecord对象,否则使用partitioner获得partition构造ProducerRecord对象.

    public void invoke(IN next) throws Exception {// propagate asynchronous errorscheckErroneous();byte[] serializedKey = schema.serializeKey(next);byte[] serializedValue = schema.serializeValue(next);String targetTopic = schema.getTargetTopic(next);if (targetTopic == null) {targetTopic = defaultTopicId;}ProducerRecord<byte[], byte[]> record;if (partitioner == null) {record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);} else {record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);}if (flushOnCheckpoint) {synchronized (pendingRecordsLock) {pendingRecords++;}}producer.send(record, callback);}

在调用KafkaProducer的send方法的时候,方法里面会调用partition方法决定数据放到哪个分区,如果ProducerRecord的partition属性存在并且合法,则使用该值,否则使用KafkaProducer中的partitioner进行分区

private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();if (partition != null) {List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());int numPartitions = partitions.size();// they have given us a partition, use itif (partition < 0 || partition >= numPartitions)throw new IllegalArgumentException("Invalid partition given with record: " + partition+ " is not in the range [0..."+ numPartitions+ "].");return partition;}return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,cluster);}

KafkaProducer的partitioner是通过读取配置获取的,默认为DefaultPartitioner,可以在properties中put partitioner.class指定要使用的partitioner

this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

这篇关于Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1002575

相关文章

Linux使用fdisk进行磁盘的相关操作

《Linux使用fdisk进行磁盘的相关操作》fdisk命令是Linux中用于管理磁盘分区的强大文本实用程序,这篇文章主要为大家详细介绍了如何使用fdisk进行磁盘的相关操作,需要的可以了解下... 目录简介基本语法示例用法列出所有分区查看指定磁盘的区分管理指定的磁盘进入交互式模式创建一个新的分区删除一个存

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

使用SQL语言查询多个Excel表格的操作方法

《使用SQL语言查询多个Excel表格的操作方法》本文介绍了如何使用SQL语言查询多个Excel表格,通过将所有Excel表格放入一个.xlsx文件中,并使用pandas和pandasql库进行读取和... 目录如何用SQL语言查询多个Excel表格如何使用sql查询excel内容1. 简介2. 实现思路3

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.