本文主要是介绍Kafka源码阅读最最最简单的入门方法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!
暴走大数据
点击右侧关注,暴走大数据!
1 消息处理入口
以下是Kafka消息处理的入口,即客户端发送到服务端消息处理方法。
/** * Top-level method that handles all requests and multiplexes to the right api */ def handle(request: RequestChannel.Request) { try{ trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal)) request.requestId match { case RequestKeys.ProduceKey => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request) case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request) case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request) case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request) case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request) case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request) case RequestKeys.ListGroupsKey => handleListGroupsRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => if ( request.requestObj != null) request.requestObj.handleError(e, requestChannel, request) else { val response = request.body.getErrorResponse(request.header.apiVersion, e) val respHeader = new ResponseHeader(request.header.correlationId) /* If request doesn't have a default error response, we just close the connection. For example, when produce request has acks set to 0 */ if (response == null) requestChannel.closeConnection(request.processor, request) else requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response))) } error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds }
2 内存中offset信息来源
/** * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = { trace("Getting offsets %s for group %s.".format(topicPartitions, group)) if (isGroupLocal(group)) { if (topicPartitions.isEmpty) { // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.) offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) => (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError)) }.toMap } else { topicPartitions.map { topicAndPartition => val groupTopicPartition = GroupTopicPartition(group, topicAndPartition) (groupTopicPartition.topicPartition, getOffset(groupTopicPartition)) }.toMap } } else { debug("Could not fetch offsets for group %s (not offset coordinator).".format(group)) topicPartitions.map { topicAndPartition => val groupTopicPartition = GroupTopicPartition(group, topicAndPartition) (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup) }.toMap } }
从上面代码中可以看出,拉取的offset是从offsetsCache
中获取。而在提交offset以及初始化group是会将对应的offset信息加入到该缓存中。
//该方法是在commitoffset中执行/** * Store offsets by appending it to the replicated log and then inserting to cache */ def prepareStoreOffsets(groupId: String, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicAndPartition, Short] => Unit): DelayedStore = { // first filter out partitions with offset metadata size exceeding limit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => validateOffsetMetadataLength(offsetAndMetadata.metadata) } // construct the message set to append val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => new Message( key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition), bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata) ) }.toSeq val offsetTopicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId)) val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) // set the callback function to insert offsets into cache after log append completed def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { // the append response should only contain the topics partition if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" .format(responseStatus, offsetTopicPartition)) // construct the commit response status and insert // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition) val responseCode = if (status.error == ErrorMapping.NoError) { filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => //将offset信息加入到缓存中 putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata) } ErrorMapping.NoError } else { debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s" .format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error))) // transform the log append error code to the corresponding the commit status error code if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) ErrorMapping.ConsumerCoordinatorNotAvailableCode else if (status.error == ErrorMapping.NotLeaderForPartitionCode) ErrorMapping.NotCoordinatorForConsumerCode else if (status.error == ErrorMapping.MessageSizeTooLargeCode || status.error == ErrorMapping.MessageSetSizeTooLargeCode || status.error == ErrorMapping.InvalidFetchSizeCode) Errors.INVALID_COMMIT_OFFSET_SIZE.code else status.error } // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) else (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) } // finally trigger the callback logic passed from the API layer responseCallback(commitStatus) } DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback) }
//该方法启动时异步执行/** * Asynchronously read the partition from the offsets topic and populate the cache */ def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) { val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition) scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets) def loadGroupsAndOffsets() { info("Loading offsets and group metadata from " + topicPartition) loadingPartitions synchronized { if (loadingPartitions.contains(offsetsPartition)) { info("Offset load from %s already in progress.".format(topicPartition)) return } else { loadingPartitions.add(offsetsPartition) } } val startMs = SystemTime.milliseconds try { replicaManager.logManager.getLog(topicPartition) match { case Some(log) => var currOffset = log.logSegments.head.baseOffset val buffer = ByteBuffer.allocate(config.loadBufferSize) // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 inWriteLock(offsetExpireLock) { val loadedGroups = mutable.Map[String, GroupMetadata]() val removedGroups = mutable.Set[String]() while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { buffer.clear() val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] messages.readInto(buffer, 0) val messageSet = new ByteBufferMessageSet(buffer) messageSet.foreach { msgAndOffset => require(msgAndOffset.message.key != null, "Offset entry key should not be null") val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key) if (baseKey.isInstanceOf[OffsetKey]) { // load offset val key = baseKey.key.asInstanceOf[GroupTopicPartition] if (msgAndOffset.message.payload == null) { if (offsetsCache.remove(key) != null) trace("Removed offset for %s due to tombstone entry.".format(key)) else trace("Ignoring redundant tombstone for %s.".format(key)) } else { // special handling for version 0: // set the expiration time stamp as commit time stamp + server default retention time val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload) //添加offset信息到缓存中 putOffset(key, value.copy ( expireTimestamp = { if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) value.commitTimestamp + config.offsetsRetentionMs else value.expireTimestamp } )) trace("Loaded offset %s for %s.".format(value, key)) } } else { // load group metadata val groupId = baseKey.key.asInstanceOf[String] val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload) if (groupMetadata != null) { trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}") removedGroups.remove(groupId) loadedGroups.put(groupId, groupMetadata) } else { loadedGroups.remove(groupId) removedGroups.add(groupId) } } currOffset = msgAndOffset.nextOffset } } loadedGroups.values.foreach { group => val currentGroup = addGroup(group) if (group != currentGroup) debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " + s"because there is already a cached group with generation ${currentGroup.generationId}") else onGroupLoaded(group) } removedGroups.foreach { groupId => val group = groupsCache.get(groupId) if (group != null) throw new IllegalStateException(s"Unexpected unload of acitve group ${group.groupId} while " + s"loading partition ${topicPartition}") } } if (!shuttingDown.get()) info("Finished loading offsets from %s in %d milliseconds." .format(topicPartition, SystemTime.milliseconds - startMs)) case None => warn("No log found for " + topicPartition) } } catch { case t: Throwable => error("Error in loading offsets from " + topicPartition, t) } finally { loadingPartitions synchronized { ownedPartitions.add(offsetsPartition) loadingPartitions.remove(offsetsPartition) } } } }
3 Offset Commit实现
当消费端消费消息是会将offset提交,即offset提交信息,broker端把接收到的offset提交信息当做一个正常的生产请求,对offset请求的处理和正常的生产者请求处理方式是一样的。下面是内置生产者的一些属性:
property.type=sync
request.required.acks=-1
key.serializer.class=StringEncoder
一旦将数据追加到leader的本地日志中,并且所有replicas都赶上leader,leader检查生产请求是“offset topic”,(因为broker端的处理逻辑针对offset请求和普通生产请求是一样的,如果是offset请求,还需要有不同的处理分支),它就会要求offset manager添加这个offset(对于延迟的生产请求,更新操作会在延迟的生产请求被完成的时候)。因为设置了acks=-1,只有当这些offsets成功地复制到ISR的所有brokers,才会被提交给offset manager。
4 Offset Fetch实现
消费端在启动时会向broker端请求offset信息,一个Offset请求中包含多个topic-partitions,在consumer客户端中根据缓存的metadata信息区分哪些partition到哪个broker上请求,在返回中会根据不同状态反馈,如当前broker正在加载offset,则返回Loading状态。
对”offsets topic”的某些partition而言,broker状态发生改变,即被当做partition的leader或者follower时,LeaderAndIsrRequest请求会被触发:
如果broker是”offsets topic”中一些partitions的leader, broker会读取指定partition的logs文件,
并将offsets加载到offset table缓存中.
任何对这些partition的提交请求仍然会更新offsets表.我们会防止日志文件中过期的offsets覆盖最近的提交请求的offsets.
被”offsets topic”中partition管理的offset抓取请求的keys直到加载成功之前是不会被使用的.
broker会返回OffsetLoadingCode的OffsetFetchResponse给消费者客户端.
如果broker是follower: 和其他正常的kafka topic一样,follower会从leader中抓取数据.
由于follower的offset manager不再负责partitions,它们会在cleanup方法被调用时清理数据.5 kafka的文件存储
在Kafka中,消息是按Topic组织的。
Partition:topic物理上的分组,一个topic可以划分为多个partition,每个partition是一个有序的队列。
Segment:partition物理上由多个segment组成
offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中每个消息都由一个连续的序列号叫做offset,用于partition唯一标识一条消息。
├── data0│ ├── cleaner-offset-checkpoint│ ├── client_mblogduration-35│ │ ├── 00000000000004909731.index│ │ ├── 00000000000004909731.log // 1G文件--Segment│ │ ├── 00000000000005048975.index // 数字是Offset│ │ ├── 00000000000005048975.log│ ├── client_mblogduration-37│ │ ├── 00000000000004955629.index│ │ ├── 00000000000004955629.log│ │ ├── 00000000000005098290.index│ │ ├── 00000000000005098290.log│ ├── __consumer_offsets-33│ │ ├── 00000000000000105157.index│ │ └── 00000000000000105157.log│ ├── meta.properties│ ├── recovery-point-offset-checkpoint│ └── replication-offset-checkpoint
cleaner-offset-checkpoint:存了每个log的最后清理offset
meta.properties: broker.id信息
recovery-point-offset-checkpoint:表示已经刷写到磁盘的记录。recoveryPoint以下的数据都是已经刷到磁盘上的了。
replication-offset-checkpoint: 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。)
6 Leader和Follower同步机制
举例说明,我们假设有一个topic,单分区,副本因子是2,即一个leader副本和一个follower副本。我们看下当producer发送一条消息时,broker端的副本到底会发生什么事情以及分区HW是如何被更新的。
下图是初始状态,我们稍微解释一下:初始时leader和follower的HW和LEO都是0(严格来说源代码会初始化LEO为-1,不过这不影响之后的讨论)。leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0。此时,producer没有发送任何消息给leader,而follower已经开始不断地给leader发送FETCH请求了,但因为没有数据因此什么都不会发生。值得一提的是,follower发送过来的FETCH请求因为无数据而暂时会被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强制完成。倘若在寄存期间producer端发送过来数据,那么会Kafka会自动唤醒该FETCH请求,让leader继续处理之。
第一种情况:follower发送FETCH请求在leader处理完PRODUCE请求之后,producer给该topic分区发送了一条消息。此时的状态如下图所示:
如图所示,leader接收到PRODUCE请求主要做两件事情:
把消息写入写底层log(同时也就自动地更新了leader的LEO)
尝试更新leader HW值(前面leader副本何时更新HW值一节中的第三个条件触发)。我们已经假设此时follower尚未发送FETCH请求,那么leader端保存的remote LEO依然是0,因此leader会比较它自己的LEO值和remote LEO值,发现最小值是0,与当前HW值相同,故不会更新分区HW值
所以,PRODUCE请求处理完成后leader端的HW值依然是0,而LEO是1,remote LEO是1。假设此时follower发送了FETCH请求(或者说follower早已发送了FETCH请求,只不过在broker的请求队列中排队),那么状态变更如下图所示:
本例中当follower发送FETCH请求时,leader端的处理依次是:
读取底层log数据
更新remote LEO = 0(为什么是0? 因为此时follower还没有写入这条消息。leader如何确认follower还未写入呢?这是通过follower发来的FETCH请求中的fetch offset来确定的)
尝试更新分区HW——此时leader LEO = 1,remote LEO = 0,故分区HW值= min(leader LEO, follower remote LEO) = 0
把数据和当前分区HW值(依然是0)发送给follower副本
而follower副本接收到FETCH response后依次执行下列操作:
写入本地log(同时更新follower LEO)
更新follower HW——比较本地LEO和当前leader HW取小者,故follower HW = 0
此时,第一轮FETCH RPC结束,我们会发现虽然leader和follower都已经在log中保存了这条消息,但分区HW值尚未被更新。实际上,它是在第二轮FETCH RPC中被更新的,如下图所示:
上图中,follower发来了第二轮FETCH请求,leader端接收到后仍然会依次执行下列操作:
读取底层log数据
更新remote LEO = 1(这次为什么是1了? 因为这轮FETCH RPC携带的fetch offset是1,那么为什么这轮携带的就是1了呢,因为上一轮结束后follower LEO被更新为1了)
尝试更新分区HW——此时leader LEO = 1,remote LEO = 1,故分区HW值= min(leader LEO, follower remote LEO) = 1。注意分区HW值此时被更新了!!!
把数据(实际上没有数据)和当前分区HW值(已更新为1)发送给follower副本
同样地,follower副本接收到FETCH response后依次执行下列操作:
写入本地log,当然没东西可写,故follower LEO也不会变化,依然是1
更新follower HW——比较本地LEO和当前leader LEO取小者。由于此时两者都是1,故更新follower HW = 1
producer端发送消息后broker端完整的处理流程就讲完了。此时消息已经成功地被复制到leader和follower的log中且分区HW是1,表明consumer能够消费offset = 0的这条消息。
以上所有的东西其实就想说明一件事情:Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。它们可能引起的问题包括:
备份数据丢失
备份数据不一致
这部分这里自己思考下就能够明白,不在赘述。
References
1.Kafka水位(high watermark)与leader epoch的讨论
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧! ????
这篇关于Kafka源码阅读最最最简单的入门方法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!