Kafka的分区数与多线程消费探讨

2024-09-06 21:18

本文主要是介绍Kafka的分区数与多线程消费探讨,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

典型的high-level Consumer的API如下:

 Properties props = new Properties();    props.put("zookeeper.connect", "xxxx:2181");    props.put("zookeeper.connectiontimeout.ms", "1000000");    props.put("group.id", "test_group");  props.put("zookeeper.session.timeout.ms", "40000");  props.put("zookeeper.sync.time.ms", "200");  props.put("auto.commit.interval.ms", "1000");ConsumerConfig consumerConfig = new ConsumerConfig(props);    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    topicCountMap.put("test", new Integer(1));    //key--topic    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =   consumerConnector.createMessageStreams(topicCountMap);  KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0);  ConsumerIterator<byte[], byte[]> it = stream.iterator();  StringBuffer sb = new StringBuffer();  while(it.hasNext()) {  try {  String msg = new String(it.next().message(), "utf-8").trim();  System.out.println("receive:" + msg);  } catch (UnsupportedEncodingException e) {  e.printStackTrace();  }  }

这是典型的kafka消费端消费数据的代码,但可以看出这是十分典型的单线程消费。不能直接用在生产实践中。

首先,最好理解kafka的基本原理和一些基本概念:

这张图比较清晰地描述了“分区”的概念,对于某一个topic的消息来说,我们可以把这组消息发送给若干个分区,就相当于一组消息分发一样。

分区、Offset、消费线程、group.id的关系

1)一组(类)消息通常由某个topic来归类,我们可以把这组消息“分发”给若干个分区(partition),每个分区的消息各不相同;

2)每个分区都维护着他自己的偏移量(Offset),记录着该分区的消息此时被消费的位置;

3)一个消费线程可以对应若干个分区,但一个分区只能被具体某一个消费线程消费;

4)group.id用于标记某一个消费组,每一个消费组都会被记录他在某一个分区的Offset,即不同consumer group针对同一个分区,都有“各自”的偏移量。 

说完概念,必须要注意的一点是,必须确认卡夫卡的server.properties里面的一个属性num.partitions必须被设置成大于1的值,否则消费端再怎么折腾,也用不了多线程哦。我这里的环境下,该属性值被设置成10了。

重构一下上述经典的消费端代码:

public class KafakConsumer implements Runnable {private ConsumerConfig consumerConfig;private static String topic="blog";Properties props;final int a_numThreads = 6;public KafakConsumer() {props = new Properties();    props.put("zookeeper.connect", "xxx:2181,yyy:2181,zzz:2181");  
//    props.put("zookeeper.connect", "localhost:2181");    
//        props.put("zookeeper.connectiontimeout.ms", "30000");    props.put("group.id", "blog");  props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest");consumerConfig = new ConsumerConfig(props);}@Overridepublic void run() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));ConsumerConfig consumerConfig = new ConsumerConfig(props);ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);for (final KafkaStream stream : streams) {executor.submit(new KafkaConsumerThread(stream));}}public static void main(String[] args) {System.out.println(topic);Thread t = new Thread(new KafakConsumer());t.start();}
}

其中,具体消费线程KafkaConsumerThread代码为:

public class KafkaConsumerThread implements Runnable {private KafkaStream<byte[], byte[]> stream;public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {this.stream = stream;}@Overridepublic void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> mam = it.next();System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "]," + "offset[" + mam.offset() + "], " + new String(mam.message()));}}}

 编写生产端(Producer)的代码:

public class KafkaProducer implements Runnable {private Producer<String, String> producer = null;private ProducerConfig config = null;public KafkaProducer() {Properties props = new Properties();props.put("zookeeper.connect", "*****:2181,****:2181,****:2181");//    props.put("zookeeper.connect", "localhost:2181");// 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[]props.put("serializer.class", "kafka.serializer.StringEncoder");// 同步还是异步,默认2表同步,1表异步。异步可以提高发送吞吐量,但是也可能导致丢失未发送过去的消息props.put("producer.type", "sync");// 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。props.put("compression.codec", "1");// 指定kafka节点列表,用于获取metadata(元数据),不必全部指定props.put("broker.list", "****:6667,***:6667,****:6667");config = new ProducerConfig(props);}@Overridepublic void run() {producer = new Producer<String, String>(config);
//    for(int i=0; i<10; i++) {
//      String sLine = "I'm number " + i;
//      KeyedMessage<String, String> msg = new KeyedMessage<String, String>("group1", sLine);
//      producer.send(msg);
//    }for(int i = 1; i <= 6; i++){ //往6个分区发数据 List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>();  for(int j = 0; j < 6; j++){ //每个分区6条讯息 messageList.add(new KeyedMessage<String, String>//String topic, String partition, String message("blog", "partition[" + i + "]", "message[The " + i + " message]"));}  producer.send(messageList);  }}public static void main(String[] args) {Thread t = new Thread(new KafkaProducer());t.start();}
}

上述生产端代码相对传统的发送端代码也做了改进,首先是用了批量发送(源码):

public void send(List<KeyedMessage<K, V>> messages){underlying().send(JavaConversions..MODULE$.asScalaBuffer(messages).toSeq());}

而不是:

public void send(KeyedMessage<K, V> message){underlying().send(Predef..MODULE$.wrapRefArray((Object[])new KeyedMessage[] { message }));}

第二,KeyedMessage用的构造函数:

public KeyedMessage(String topic, K key, V message) { this(topic, key, key, message); }

第二个参数表示分区的key。 

而非:

public KeyedMessage(String topic, V message) {this(topic, null, null, message);}

分别run一下生产和消费的代码,可以看到消费端打印结果:

pool-2-thread-5: partition[5],offset[0], message[The 5 message]

pool-2-thread-1: partition[2],offset[0], message[The 2 message]

pool-2-thread-2: partition[1],offset[0], message[The 1 message]

pool-2-thread-5: partition[4],offset[0], message[The 4 message]

pool-2-thread-1: partition[3],offset[0], message[The 3 message]

pool-2-thread-4: partition[6],offset[0], message[The 6 message]

 可以看到,6个分区的数据全部被消费了,但是不妨看下消费线程:pool-2-thread-1线程同时消费了partition[2]和partition[3]的数据;pool-2-thread-2消费了partiton[1]的数据;pool-2-thread-4消费了partiton[6]的数据;而pool-2-thread-5则消费了partitoin[4]和partition[5]的数据。

从上述消费情况来看,验证了消费线程和分区的对应情况——即:一个分区只能被一个线程消费,但一个消费线程可以消费多个分区的数据!虽然我指定了线程池的线程数为6,但并不是所有的线程都去消费了,这当然跟线程池的调度有关系了。并不是一个消费线程对应地去消费一个分区的数据。

我们不妨仔细看下消费端启动日志部分,这对我们理解kafka的启动生成和消费的原理有益:

消费端的启动日志表明:

1)Consumer happy_Connor-PC-1445916157267-b9cce79d rebalancing the following partitions: ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) for topic blog with consumers: List(happy_Connor-PC-1445916157267-b9cce79d-0, happy_Connor-PC-1445916157267-b9cce79d-1, happy_Connor-PC-1445916157267-b9cce79d-2, happy_Connor-PC-1445916157267-b9cce79d-3, happy_Connor-PC-1445916157267-b9cce79d-4, happy_Connor-PC-1445916157267-b9cce79d-5)

happy_Connor-PC-1445916157267-b9cce79d表示一个消费组,该topic可以使用10个分区:the following partitions: ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) for topic blog。然后定义了6个消费线程,List(happy_Connor-PC-1445916157267-b9cce79d-0, happy_Connor-PC-1445916157267-b9cce79d-1, happy_Connor-PC-1445916157267-b9cce79d-2, happy_Connor-PC-1445916157267-b9cce79d-3, happy_Connor-PC-1445916157267-b9cce79d-4, happy_Connor-PC-1445916157267-b9cce79d-5)。消费线程的个数由topicCountMap.put(String topic, Integer count)的第二个参数决定。但真正去消费的线程还是由线程池的调度机制来决定;

2)线程由zookeeper来声明它拥有1个或多个分区;

3)真正有数据存在的分区是由生产发送端来决定,即使你的kafka设置了10个分区,消费端在消费的时候,消费线程虽然会根据zookeeper的某种机制来声明它所消费的分区,但实际消费过程中,还是会消费真正存在数据的分区。(本例中,你只往6个分区push了数据,所以即使你声明了10个分区,你也只能消费6个分区的数据)。

 

如果把topicCountMap的第二个参数Integer值改成1,发送端改成往7个分区发数据再测试,可得到消费端的打印结果:

pool-2-thread-1: partition[6],offset[0], message[The 6 message]
pool-2-thread-1: partition[3],offset[0], message[The 3 message]
pool-2-thread-1: partition[2],offset[0], message[The 2 message]
pool-2-thread-1: partition[5],offset[0], message[The 5 message]
pool-2-thread-1: partition[4],offset[0], message[The 4 message]
pool-2-thread-1: partition[7],offset[0], message[The 7 message]
pool-2-thread-1: partition[1],offset[0], message[The 1 message]

可以看出,如果你topicCountMap的值改成1,而 List<KafkaStream<byte[], byte[]>>的size由Integer值决定,此时为1,可以看出,线程池中只能使用一个线程来发送,还是单线程的效果。若要用多线程消费,Integer的值必须大于1.

下面再来模拟一些状况:

状况一:往大于实际分区数的分区发数据,比如发送端的第一层循环设为11:

可看到消费端此时虽能正常的完全消费这10个分区的数据,但生产端会报异常:

No partition metadata for topic blog4 due to kafka.common.LeaderNotAvailableException}] for topic [blog4]: class kafka.common.LeaderNotAvailableException 

这说明,你往partition11发送失败,因为卡夫卡已经设置了10个分区,你再往不存在的分区数发当然会报错了。 

状况二:发送端用传统的发送方法,即KeyedMessage的构造函数只有topic和Message

//针对topic创建一个分区并发送数据List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>();for(int i = 1; i <= 10; i++){messageList.add(new KeyedMessage<String, String>("blog6",  "我是发送的内容message"+i));} producer.send(messageList);

消费端打印结果:

pool-2-thread-1: partition[7],offset[0], 我是发送的内容message1

pool-2-thread-1: partition[7],offset[1], 我是发送的内容message2

pool-2-thread-1: partition[7],offset[2], 我是发送的内容message3

pool-2-thread-1: partition[7],offset[3], 我是发送的内容message4

pool-2-thread-1: partition[7],offset[4], 我是发送的内容message5

pool-2-thread-1: partition[7],offset[5], 我是发送的内容message6

pool-2-thread-1: partition[7],offset[6], 我是发送的内容message7

pool-2-thread-1: partition[7],offset[7], 我是发送的内容message8

pool-2-thread-1: partition[7],offset[8], 我是发送的内容message9

pool-2-thread-1: partition[7],offset[9], 我是发送的内容message10

 

这表明,只用了1个消费线程消费1个分区的数据。这说明,如果发送端发送数据没有指定分区,即用的是

public KeyedMessage(String topic,V message) { this(topic, key, key, message); }
sendMessage(KeyedMessage(String topic,V message))

的话,同样达不到多线程消费的效果!

状况三:将线程池的大小设置成比topicCountMap的value值小?

  topicCountMap.put(topic, new Integer(7));//......................ExecutorService executor = Executors.newFixedThreadPool(5);

 发送端往9个分区发送数据,run一下,会发现消费端打印结果:

pool-2-thread-3: partition[7],offset[0], message[The 7 message]

pool-2-thread-5: partition[1],offset[0], message[The 1 message]

pool-2-thread-4: partition[4],offset[0], message[The 4 message]

pool-2-thread-2: partition[3],offset[0], message[The 3 message]

pool-2-thread-4: partition[5],offset[0], message[The 5 message]

pool-2-thread-1: partition[8],offset[0], message[The 8 message]

pool-2-thread-2: partition[2],offset[0], message[The 2 message]

你会发现:虽然我生产发送端往9个分区发送了数据,但实际上只消费掉了7个分区的数据。(如果你再跑一边,可能又是6个分区的数据)——这说明,有的分区的数据没有被消费,原因只可能是线程不够。so,当线程池中的大小小于分区数时,会出现有的分区没有被采集的情况。建议设置:实际发送分区数(一般就等于设置的分区数)= topicCountMap的value = 线程池大小  否则极易出现reblance的异常!!!

好了,折腾这么久。我们可以看出,卡夫卡如果想要多线程消费提高效率的话,就可以从分区数上下手,分区数就是用来做并行消费的而且生产端的发送代码也很有讲究。

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Kafka的分区数与多线程消费探讨的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143139

相关文章

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

多线程解析报表

假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。 Way1 join import java.time.LocalTime;public class Main {public static void main(String[] args) thro

Java 多线程概述

多线程技术概述   1.线程与进程 进程:内存中运行的应用程序,每个进程都拥有一个独立的内存空间。线程:是进程中的一个执行路径,共享一个内存空间,线程之间可以自由切换、并发执行,一个进程最少有一个线程,线程实际数是在进程基础之上的进一步划分,一个进程启动之后,进程之中的若干执行路径又可以划分成若干个线程 2.线程的调度 分时调度:所有线程轮流使用CPU的使用权,平均分配时间抢占式调度

Java 多线程的基本方式

Java 多线程的基本方式 基础实现两种方式: 通过实现Callable 接口方式(可得到返回值):

JAVA- 多线程

一,多线程的概念 1.并行与并发 并行:多个任务在同一时刻在cpu 上同时执行并发:多个任务在同一时刻在cpu 上交替执行 2.进程与线程 进程:就是操作系统中正在运行的一个应用程序。所以进程也就是“正在进行的程序”。(Windows系统中,我们可以在任务管理器中看 到进程) 线程:是程序运行的基本执行单元。当操作系统执行一个程序时, 会在系统中建立一个进程,该进程必须至少建立一个线

多线程篇(阻塞队列- LinkedBlockingDeque)(持续更新迭代)

目录 一、LinkedBlockingDeque是什么 二、核心属性详解 三、核心方法详解 addFirst(E e) offerFirst(E e) putFirst(E e) removeFirst() pollFirst() takeFirst() 其他 四、总结 一、LinkedBlockingDeque是什么 首先queue是一种数据结构,一个集合中

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准