本文主要是介绍kafka源码6-Sender线程对Broker响应的处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
生产者处理响应
前面我们分析了producer发送请求的过程,现在分析发送请求后,怎么处理响应的,producer的响应处理也是在Sender线程中处理的,再看Selector的pollSelectionKeys方法
class Selector{
void pollSelectionKeys(Set<SelectionKey> selectionKeys,boolean isImmediatelyConnected,long currentTimeNanos) {/*** 遍历所有的key*/for (SelectionKey key : determineHandlingOrder(selectionKeys)) {/*** 获取对应的channel*/KafkaChannel channel = channel(key);long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;// register all per-connection metrics at oncesensors.maybeRegisterConnectionMetrics(channel.id());if (idleExpiryManager != null)idleExpiryManager.update(channel.id(), currentTimeNanos);boolean sendFailed = false;try {............./***读取响应的数据*/attemptRead(key, channel);....................}/* cancel any defunct sockets */if (!key.isValid())close(channel, CloseMode.GRACEFUL);} catch (Exception e) {...........} finally {maybeRecordTimePerConnection(channel, channelStartTimeNanos);}}private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {//if channel is ready and has bytes to read from socket or buffer, and has no//previous receive(s) already staged or otherwise in progress then read from itif (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)&& !explicitlyMutedChannels.contains(channel)) {NetworkReceive networkReceive;while ((networkReceive = channel.read()) != null) {madeReadProgressLastPoll = true;/*** 有Broker返回的响应,添加*/addToStagedReceives(channel, networkReceive);}if (channel.isMute()) {outOfMemory = true; //channel has muted itself due to memory pressure.} else {madeReadProgressLastPoll = true;}}}private void addToStagedReceives
这篇关于kafka源码6-Sender线程对Broker响应的处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!