Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布

本文主要是介绍Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使Flink输出的数据在多个partition中均匀分布

FlinkKafkaProducerBase的子类可以使用默认的KafkaPartitioner FixedPartitioner(只向partition 0中写数据)也可以使用自己定义的Partitioner(继承KafkaPartitioner),我觉得实现比较复杂.

构造FlinkKafkaProducerBase的子类的2种情况

    public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, 
Properties producerConfig) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());}public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, 
Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);}

默认的FixedPartitioner

public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {private static final long serialVersionUID = 1627268846962918126L;private int targetPartition = -1;@Overridepublic void open(int parallelInstanceId, int parallelInstances, int[] partitions) {if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {throw new IllegalArgumentException();}this.targetPartition = partitions[parallelInstanceId % partitions.length];}@Overridepublic int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {if (targetPartition >= 0) {return targetPartition;} else {throw new RuntimeException("The partitioner has not been initialized properly");}}
}

在构造FlinkKafkaProducerBase的子类时,可以传递一个值为null的KafkaPartitioner,这样就可以使用Kafka Client默认的Partitioner,默认的Paritioner就是将数据均匀分配到各个partition中.

protected FlinkKafkaProducerBase<Record> createSink(String topic, KeyedSerializationSchemadeserializationSchema, Properties properties) {String classFullName = "";if (kafkaVersion.startsWith("0.8")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08";} else if (kafkaVersion.startsWith("0.9")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else if (kafkaVersion.startsWith("0.10")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else {throw new RuntimeException("not support the "+"version kafka = " + kafkaVersion);}FlinkKafkaProducerBase<Record> sink = null;try {Class clazz = Class.forName(classFullName);Constructor constructor = clazz.getConstructor(String.class, 
KeyedSerializationSchema.class, Properties.class, KafkaPartitioner.class);sink = (FlinkKafkaProducerBase) constructor.newInstance(topic, 
deserializationSchema, properties,(KafkaPartitioner)null);} catch (Throwable e) {e.printStackTrace();}return sink;}

Kafka Client中默认的Partitioner

public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}public void close() {}
}

调用过程

在调用FlinkKafkaProducerBase中的invoke方法时,会判断partitioner是否为空,如果为空则构建一个partition属性为空的ProducerRecord对象,否则使用partitioner获得partition构造ProducerRecord对象.

    public void invoke(IN next) throws Exception {// propagate asynchronous errorscheckErroneous();byte[] serializedKey = schema.serializeKey(next);byte[] serializedValue = schema.serializeValue(next);String targetTopic = schema.getTargetTopic(next);if (targetTopic == null) {targetTopic = defaultTopicId;}ProducerRecord<byte[], byte[]> record;if (partitioner == null) {record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);} else {record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);}if (flushOnCheckpoint) {synchronized (pendingRecordsLock) {pendingRecords++;}}producer.send(record, callback);}

在调用KafkaProducer的send方法的时候,方法里面会调用partition方法决定数据放到哪个分区,如果ProducerRecord的partition属性存在并且合法,则使用该值,否则使用KafkaProducer中的partitioner进行分区

private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();if (partition != null) {List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());int numPartitions = partitions.size();// they have given us a partition, use itif (partition < 0 || partition >= numPartitions)throw new IllegalArgumentException("Invalid partition given with record: " + partition+ " is not in the range [0..."+ numPartitions+ "].");return partition;}return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,cluster);}

KafkaProducer的partitioner是通过读取配置获取的,默认为DefaultPartitioner,可以在properties中put partitioner.class指定要使用的partitioner

this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

这篇关于Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1002575

相关文章

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

vue使用docxtemplater导出word

《vue使用docxtemplater导出word》docxtemplater是一种邮件合并工具,以编程方式使用并处理条件、循环,并且可以扩展以插入任何内容,下面我们来看看如何使用docxtempl... 目录docxtemplatervue使用docxtemplater导出word安装常用语法 封装导出方

Linux换行符的使用方法详解

《Linux换行符的使用方法详解》本文介绍了Linux中常用的换行符LF及其在文件中的表示,展示了如何使用sed命令替换换行符,并列举了与换行符处理相关的Linux命令,通过代码讲解的非常详细,需要的... 目录简介检测文件中的换行符使用 cat -A 查看换行符使用 od -c 检查字符换行符格式转换将

Java编译生成多个.class文件的原理和作用

《Java编译生成多个.class文件的原理和作用》作为一名经验丰富的开发者,在Java项目中执行编译后,可能会发现一个.java源文件有时会产生多个.class文件,从技术实现层面详细剖析这一现象... 目录一、内部类机制与.class文件生成成员内部类(常规内部类)局部内部类(方法内部类)匿名内部类二、

使用Jackson进行JSON生成与解析的新手指南

《使用Jackson进行JSON生成与解析的新手指南》这篇文章主要为大家详细介绍了如何使用Jackson进行JSON生成与解析处理,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 核心依赖2. 基础用法2.1 对象转 jsON(序列化)2.2 JSON 转对象(反序列化)3.

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

Elasticsearch 在 Java 中的使用教程

《Elasticsearch在Java中的使用教程》Elasticsearch是一个分布式搜索和分析引擎,基于ApacheLucene构建,能够实现实时数据的存储、搜索、和分析,它广泛应用于全文... 目录1. Elasticsearch 简介2. 环境准备2.1 安装 Elasticsearch2.2 J

使用C#代码在PDF文档中添加、删除和替换图片

《使用C#代码在PDF文档中添加、删除和替换图片》在当今数字化文档处理场景中,动态操作PDF文档中的图像已成为企业级应用开发的核心需求之一,本文将介绍如何在.NET平台使用C#代码在PDF文档中添加、... 目录引言用C#添加图片到PDF文档用C#删除PDF文档中的图片用C#替换PDF文档中的图片引言在当

Java中List的contains()方法的使用小结

《Java中List的contains()方法的使用小结》List的contains()方法用于检查列表中是否包含指定的元素,借助equals()方法进行判断,下面就来介绍Java中List的c... 目录详细展开1. 方法签名2. 工作原理3. 使用示例4. 注意事项总结结论:List 的 contain

C#使用SQLite进行大数据量高效处理的代码示例

《C#使用SQLite进行大数据量高效处理的代码示例》在软件开发中,高效处理大数据量是一个常见且具有挑战性的任务,SQLite因其零配置、嵌入式、跨平台的特性,成为许多开发者的首选数据库,本文将深入探... 目录前言准备工作数据实体核心技术批量插入:从乌龟到猎豹的蜕变分页查询:加载百万数据异步处理:拒绝界面