本文主要是介绍Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前面介绍过NetworkClient的实现,它依赖于KSelector、InFlightRequests、Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector法实现了发送请求的功能,并通过一系列handle*方法处理请求响应、超时请求以及断线重连。
ConsumerNetworkClient在NetworkClient之上进行了封装,提供了更高级的功能和更易用的API。
在图中展示了ConsumerNetworkClient的核心字段以及其依赖的组件。
- client:NetworkClient对象。
- delayedTasks:定时任务队列,DelayedTaskQueue是Kafka提供的定时任务队列的实现,其底层是使用JDK提供的PriorityQueue实现。
简单介绍一下PriorityQueue,这是一个非线程安全的、无界的、优先级队列,实现原理是小顶堆,底层是基于数组实现的,其对应的线程安全实现是PriorityBlockingQueue,这个定时任务队列中是心跳任务。 - metadata:用于管理Kafka集群元数据。
- unsent:缓冲队列,Map<Node,List类型,key是Node节点,value是发往此Node的ClientRequest集合。
- unsentExpiryMs:ClientRequest在unsent中缓存的超时时长。
- wakeup:由调用KafkaConsumer对象的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程。
- wakeupDisabledCount:KafkaConsumer是否正在执行不可中断的方法。每进入一个不可中断的方法时,则增加一,退出不可中断方法时,则减少一。
wakeupDisabledCount只会被KafkaConsumer线程修改,其他线程不能修改。
ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll方法有多个重载,最终会调用poll(long timeout,long now,boolean executeDelayedTasks)重载,这三个参数的含义分别是:
- timeout表示执行poll方法的最长阻塞时间(单位是ms),如果为0,则表示不阻塞;
- now表示当前时间戳;
- executeDelayedTasks表示是否执行delayedTasks队列中的定时任务。
下面介绍其流程,其中简单回顾一下NetworkClient的功能:
-
调用ConsumerNetworkClient.trySend方法循环处理unsent中缓存的请求。
具体逻辑是:对每个Node节点,循环遍历其对应的ClientRequest列表,每次循环都调用NetworkClient.ready方法检测消费者与此节点之间的连接,以及发送请求的条件。
若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队中等待响应,也放入KafkaChannel的send字段中等待发送,并将此消息从列表中删除。实现代码如下:
-
计算超时时间,此超时时间由timeout与delayedTasks队列中最近要执行的定时任务的时间共同决定。在下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时长,避免影响定时任务的执行。
-
调用NetworkClient.poll方法,将KafkaChannel.send字段指定的消息发送出去。除此之外,NetworkClient.poll()方法可能会更新Metadata使用一系列handle*方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数。
-
调用ConsumerNetworkClient.maybeTriggerWakeup方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求,则抛出WakeupException异常,中断当前ConsumerNetworkClient.poll方法。
- 调用checkDisconnects方法检测连接状态。检测消费者与每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。
-
根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中的定时任务,则调用delayedTasks.poll()方法。
-
再次调用trySend方法。在步骤3中调用了NetworkClient.poll方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这里再次尝试调用trySend方法。
-
调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求。它会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合中删除。
分析完poll方法的详细步骤之后,我们下面来看其实现代码:
pollNoWakeup方法是poll方法的变体,表示执行不可被中断的poll方法。
具体逻辑是:在执行poll方法之前,会调用disableWakeups方法将wakeupDisabledCount加一,然后调用poll方法。这样,即使其他线程请求中断,也不会被响应。
poll(future)是poll方法的另一个实现阻塞发送请求的功能,代码如下所示。
在ConsumerNetworkClient.send方法中,会将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送,具体代码如下。
在这里需要重点关注的是KafkaConsumer中使用的回调对象—RequestFutureCompletionHandler,其继承关系如图所示。
从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,它还继承了RequestFuture类。RequestFuture是一个泛型类,其核心字段如下所示。
- isDone:表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为true。
- exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空则表示出现异常,反之则表示正常完成。
- value:记录请求正常完成时收到的响应,与exception字段互斥。此字段非空表示正常完成,反之表示出现异常。
- listeners:RequestFutureListener集合, 用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常两种情况。
在RequestFuture中有两处典型设计模式的使用:一处是compose方法,使用了适配器模式;另一处是chain方法,使用了责任链模式。下面是compose方法的相关代码:
图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。
当调用RequestFuture对象的complete()或raise()方法时,会调用RequestFutureListener的onSuccess()或onFailure()方法,然后调用RequestFutureAdapter<T,S>的对应方法,最终调用RequestFuture对象的对应方法。
RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture之间传递事件。下面是其具体代码:
RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法,代码比较简单,不再赘述了。
介绍完RequestFutureCompleteHandler之后,回到ConsumerNetworkClient的分析上来。下面简单介绍ConsumerNetworkClient中几个常用的功能,代码比较简单,就不贴出来了:
- awaitMetadataUpdate()方法:循环调用poll方法,直到Metadata版本号增加,实现阻塞等待Metadata更新完成。
- awaitPendingRequests()方法:等待unsent和InFightRequests中的请求全部完成(正常收到响应或出现异常)。
- put()方法:向unsent中添加请求。
- schedule()方法:向delayedTasks队列中添加定时任务。
- leastLoadedNode()方法:查找Kafka集群中负载最低的Node。
这篇关于Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!