本文主要是介绍Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
使Flink输出的数据在多个partition中均匀分布
FlinkKafkaProducerBase的子类可以使用默认的KafkaPartitioner FixedPartitioner(只向partition 0中写数据)也可以使用自己定义的Partitioner(继承KafkaPartitioner),我觉得实现比较复杂.
构造FlinkKafkaProducerBase的子类的2种情况
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema,
Properties producerConfig) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());}public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema,
Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);}
默认的FixedPartitioner
public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {private static final long serialVersionUID = 1627268846962918126L;private int targetPartition = -1;@Overridepublic void open(int parallelInstanceId, int parallelInstances, int[] partitions) {if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {throw new IllegalArgumentException();}this.targetPartition = partitions[parallelInstanceId % partitions.length];}@Overridepublic int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {if (targetPartition >= 0) {return targetPartition;} else {throw new RuntimeException("The partitioner has not been initialized properly");}}
}
在构造FlinkKafkaProducerBase的子类时,可以传递一个值为null的KafkaPartitioner,这样就可以使用Kafka Client默认的Partitioner,默认的Paritioner就是将数据均匀分配到各个partition中.
protected FlinkKafkaProducerBase<Record> createSink(String topic, KeyedSerializationSchemadeserializationSchema, Properties properties) {String classFullName = "";if (kafkaVersion.startsWith("0.8")) {classFullName =
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08";} else if (kafkaVersion.startsWith("0.9")) {classFullName =
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else if (kafkaVersion.startsWith("0.10")) {classFullName =
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else {throw new RuntimeException("not support the "+"version kafka = " + kafkaVersion);}FlinkKafkaProducerBase<Record> sink = null;try {Class clazz = Class.forName(classFullName);Constructor constructor = clazz.getConstructor(String.class,
KeyedSerializationSchema.class, Properties.class, KafkaPartitioner.class);sink = (FlinkKafkaProducerBase) constructor.newInstance(topic,
deserializationSchema, properties,(KafkaPartitioner)null);} catch (Throwable e) {e.printStackTrace();}return sink;}
Kafka Client中默认的Partitioner
public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}public void close() {}
}
调用过程
在调用FlinkKafkaProducerBase中的invoke方法时,会判断partitioner是否为空,如果为空则构建一个partition属性为空的ProducerRecord对象,否则使用partitioner获得partition构造ProducerRecord对象.
public void invoke(IN next) throws Exception {// propagate asynchronous errorscheckErroneous();byte[] serializedKey = schema.serializeKey(next);byte[] serializedValue = schema.serializeValue(next);String targetTopic = schema.getTargetTopic(next);if (targetTopic == null) {targetTopic = defaultTopicId;}ProducerRecord<byte[], byte[]> record;if (partitioner == null) {record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);} else {record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);}if (flushOnCheckpoint) {synchronized (pendingRecordsLock) {pendingRecords++;}}producer.send(record, callback);}
在调用KafkaProducer的send方法的时候,方法里面会调用partition方法决定数据放到哪个分区,如果ProducerRecord的partition属性存在并且合法,则使用该值,否则使用KafkaProducer中的partitioner进行分区
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();if (partition != null) {List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());int numPartitions = partitions.size();// they have given us a partition, use itif (partition < 0 || partition >= numPartitions)throw new IllegalArgumentException("Invalid partition given with record: " + partition+ " is not in the range [0..."+ numPartitions+ "].");return partition;}return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,cluster);}
KafkaProducer的partitioner是通过读取配置获取的,默认为DefaultPartitioner,可以在properties中put partitioner.class指定要使用的partitioner
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
这篇关于Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!