Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布

本文主要是介绍Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使Flink输出的数据在多个partition中均匀分布

FlinkKafkaProducerBase的子类可以使用默认的KafkaPartitioner FixedPartitioner(只向partition 0中写数据)也可以使用自己定义的Partitioner(继承KafkaPartitioner),我觉得实现比较复杂.

构造FlinkKafkaProducerBase的子类的2种情况

    public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, 
Properties producerConfig) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());}public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, 
Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);}

默认的FixedPartitioner

public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {private static final long serialVersionUID = 1627268846962918126L;private int targetPartition = -1;@Overridepublic void open(int parallelInstanceId, int parallelInstances, int[] partitions) {if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {throw new IllegalArgumentException();}this.targetPartition = partitions[parallelInstanceId % partitions.length];}@Overridepublic int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {if (targetPartition >= 0) {return targetPartition;} else {throw new RuntimeException("The partitioner has not been initialized properly");}}
}

在构造FlinkKafkaProducerBase的子类时,可以传递一个值为null的KafkaPartitioner,这样就可以使用Kafka Client默认的Partitioner,默认的Paritioner就是将数据均匀分配到各个partition中.

protected FlinkKafkaProducerBase<Record> createSink(String topic, KeyedSerializationSchemadeserializationSchema, Properties properties) {String classFullName = "";if (kafkaVersion.startsWith("0.8")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08";} else if (kafkaVersion.startsWith("0.9")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else if (kafkaVersion.startsWith("0.10")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else {throw new RuntimeException("not support the "+"version kafka = " + kafkaVersion);}FlinkKafkaProducerBase<Record> sink = null;try {Class clazz = Class.forName(classFullName);Constructor constructor = clazz.getConstructor(String.class, 
KeyedSerializationSchema.class, Properties.class, KafkaPartitioner.class);sink = (FlinkKafkaProducerBase) constructor.newInstance(topic, 
deserializationSchema, properties,(KafkaPartitioner)null);} catch (Throwable e) {e.printStackTrace();}return sink;}

Kafka Client中默认的Partitioner

public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}public void close() {}
}

调用过程

在调用FlinkKafkaProducerBase中的invoke方法时,会判断partitioner是否为空,如果为空则构建一个partition属性为空的ProducerRecord对象,否则使用partitioner获得partition构造ProducerRecord对象.

    public void invoke(IN next) throws Exception {// propagate asynchronous errorscheckErroneous();byte[] serializedKey = schema.serializeKey(next);byte[] serializedValue = schema.serializeValue(next);String targetTopic = schema.getTargetTopic(next);if (targetTopic == null) {targetTopic = defaultTopicId;}ProducerRecord<byte[], byte[]> record;if (partitioner == null) {record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);} else {record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);}if (flushOnCheckpoint) {synchronized (pendingRecordsLock) {pendingRecords++;}}producer.send(record, callback);}

在调用KafkaProducer的send方法的时候,方法里面会调用partition方法决定数据放到哪个分区,如果ProducerRecord的partition属性存在并且合法,则使用该值,否则使用KafkaProducer中的partitioner进行分区

private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();if (partition != null) {List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());int numPartitions = partitions.size();// they have given us a partition, use itif (partition < 0 || partition >= numPartitions)throw new IllegalArgumentException("Invalid partition given with record: " + partition+ " is not in the range [0..."+ numPartitions+ "].");return partition;}return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,cluster);}

KafkaProducer的partitioner是通过读取配置获取的,默认为DefaultPartitioner,可以在properties中put partitioner.class指定要使用的partitioner

this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

这篇关于Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1002575

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传