Kafka源码阅读最最最简单的入门方法

2024-09-06 21:18

本文主要是介绍Kafka源码阅读最最最简单的入门方法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

1 消息处理入口

以下是Kafka消息处理的入口,即客户端发送到服务端消息处理方法。

/**   * Top-level method that handles all requests and multiplexes to the right api   */  def handle(request: RequestChannel.Request) {    try{      trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".        format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))      request.requestId match {        case RequestKeys.ProduceKey => handleProducerRequest(request)        case RequestKeys.FetchKey => handleFetchRequest(request)        case RequestKeys.OffsetsKey => handleOffsetRequest(request)        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)        case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)        case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)        case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)        case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)        case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)        case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)        case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)        case requestId => throw new KafkaException("Unknown api code " + requestId)      }    } catch {      case e: Throwable =>        if ( request.requestObj != null)          request.requestObj.handleError(e, requestChannel, request)        else {          val response = request.body.getErrorResponse(request.header.apiVersion, e)          val respHeader = new ResponseHeader(request.header.correlationId)          /* If request doesn't have a default error response, we just close the connection.             For example, when produce request has acks set to 0 */          if (response == null)            requestChannel.closeConnection(request.processor, request)          else            requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))        }        error("error when handling request %s".format(request.requestObj), e)    } finally      request.apiLocalCompleteTimeMs = SystemTime.milliseconds  }

2 内存中offset信息来源

/**   * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either   * returns the current offset or it begins to sync the cache from the log (and returns an error code).   */  def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {    trace("Getting offsets %s for group %s.".format(topicPartitions, group))    if (isGroupLocal(group)) {      if (topicPartitions.isEmpty) {        // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)        offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>          (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError))        }.toMap      } else {        topicPartitions.map { topicAndPartition =>          val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)          (groupTopicPartition.topicPartition, getOffset(groupTopicPartition))        }.toMap      }    } else {      debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))      topicPartitions.map { topicAndPartition =>        val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)        (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup)      }.toMap    }  }

从上面代码中可以看出,拉取的offset是从offsetsCache中获取。而在提交offset以及初始化group是会将对应的offset信息加入到该缓存中。

//该方法是在commitoffset中执行/**   * Store offsets by appending it to the replicated log and then inserting to cache   */  def prepareStoreOffsets(groupId: String,                          consumerId: String,                          generationId: Int,                          offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],                          responseCallback: immutable.Map[TopicAndPartition, Short] => Unit): DelayedStore = {    // first filter out partitions with offset metadata size exceeding limit    val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>      validateOffsetMetadataLength(offsetAndMetadata.metadata)    }    // construct the message set to append    val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>      new Message(        key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),        bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)      )    }.toSeq    val offsetTopicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))    val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->      new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))    // set the callback function to insert offsets into cache after log append completed    def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {      // the append response should only contain the topics partition      if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))        throw new IllegalStateException("Append status %s should only have one partition %s"          .format(responseStatus, offsetTopicPartition))      // construct the commit response status and insert      // the offset and metadata to cache if the append status has no error      val status = responseStatus(offsetTopicPartition)      val responseCode =        if (status.error == ErrorMapping.NoError) {          filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>            //将offset信息加入到缓存中            putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)          }          ErrorMapping.NoError        } else {          debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"            .format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error)))          // transform the log append error code to the corresponding the commit status error code          if (status.error == ErrorMapping.UnknownTopicOrPartitionCode)            ErrorMapping.ConsumerCoordinatorNotAvailableCode          else if (status.error == ErrorMapping.NotLeaderForPartitionCode)            ErrorMapping.NotCoordinatorForConsumerCode          else if (status.error == ErrorMapping.MessageSizeTooLargeCode            || status.error == ErrorMapping.MessageSetSizeTooLargeCode            || status.error == ErrorMapping.InvalidFetchSizeCode)            Errors.INVALID_COMMIT_OFFSET_SIZE.code          else            status.error        }      // compute the final error codes for the commit response      val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>        if (validateOffsetMetadataLength(offsetAndMetadata.metadata))          (topicAndPartition, responseCode)        else          (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)      }      // finally trigger the callback logic passed from the API layer      responseCallback(commitStatus)    }    DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)  }
//该方法启动时异步执行/**   * Asynchronously read the partition from the offsets topic and populate the cache   */  def loadGroupsForPartition(offsetsPartition: Int,                             onGroupLoaded: GroupMetadata => Unit) {    val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)    scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)    def loadGroupsAndOffsets() {      info("Loading offsets and group metadata from " + topicPartition)      loadingPartitions synchronized {        if (loadingPartitions.contains(offsetsPartition)) {          info("Offset load from %s already in progress.".format(topicPartition))          return        } else {          loadingPartitions.add(offsetsPartition)        }      }      val startMs = SystemTime.milliseconds      try {        replicaManager.logManager.getLog(topicPartition) match {          case Some(log) =>            var currOffset = log.logSegments.head.baseOffset            val buffer = ByteBuffer.allocate(config.loadBufferSize)            // loop breaks if leader changes at any time during the load, since getHighWatermark is -1            inWriteLock(offsetExpireLock) {              val loadedGroups = mutable.Map[String, GroupMetadata]()              val removedGroups = mutable.Set[String]()              while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {                buffer.clear()                val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]                messages.readInto(buffer, 0)                val messageSet = new ByteBufferMessageSet(buffer)                messageSet.foreach { msgAndOffset =>                  require(msgAndOffset.message.key != null, "Offset entry key should not be null")                  val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)                  if (baseKey.isInstanceOf[OffsetKey]) {                    // load offset                    val key = baseKey.key.asInstanceOf[GroupTopicPartition]                    if (msgAndOffset.message.payload == null) {                      if (offsetsCache.remove(key) != null)                        trace("Removed offset for %s due to tombstone entry.".format(key))                      else                        trace("Ignoring redundant tombstone for %s.".format(key))                    } else {                      // special handling for version 0:                      // set the expiration time stamp as commit time stamp + server default retention time                      val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)                      //添加offset信息到缓存中                      putOffset(key, value.copy (                        expireTimestamp = {                          if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)                            value.commitTimestamp + config.offsetsRetentionMs                          else                            value.expireTimestamp                        }                      ))                      trace("Loaded offset %s for %s.".format(value, key))                    }                  } else {                    // load group metadata                    val groupId = baseKey.key.asInstanceOf[String]                    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)                    if (groupMetadata != null) {                      trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")                      removedGroups.remove(groupId)                      loadedGroups.put(groupId, groupMetadata)                    } else {                      loadedGroups.remove(groupId)                      removedGroups.add(groupId)                    }                  }                  currOffset = msgAndOffset.nextOffset                }              }              loadedGroups.values.foreach { group =>                val currentGroup = addGroup(group)                if (group != currentGroup)                  debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +                    s"because there is already a cached group with generation ${currentGroup.generationId}")                else                  onGroupLoaded(group)              }              removedGroups.foreach { groupId =>                val group = groupsCache.get(groupId)                if (group != null)                  throw new IllegalStateException(s"Unexpected unload of acitve group ${group.groupId} while " +                    s"loading partition ${topicPartition}")              }            }            if (!shuttingDown.get())              info("Finished loading offsets from %s in %d milliseconds."                .format(topicPartition, SystemTime.milliseconds - startMs))          case None =>            warn("No log found for " + topicPartition)        }      }      catch {        case t: Throwable =>          error("Error in loading offsets from " + topicPartition, t)      }      finally {        loadingPartitions synchronized {          ownedPartitions.add(offsetsPartition)          loadingPartitions.remove(offsetsPartition)        }      }    }  }

3 Offset Commit实现

当消费端消费消息是会将offset提交,即offset提交信息,broker端把接收到的offset提交信息当做一个正常的生产请求,对offset请求的处理和正常的生产者请求处理方式是一样的。下面是内置生产者的一些属性:

  • property.type=sync

  • request.required.acks=-1

  • key.serializer.class=StringEncoder

一旦将数据追加到leader的本地日志中,并且所有replicas都赶上leader,leader检查生产请求是“offset topic”,(因为broker端的处理逻辑针对offset请求和普通生产请求是一样的,如果是offset请求,还需要有不同的处理分支),它就会要求offset manager添加这个offset(对于延迟的生产请求,更新操作会在延迟的生产请求被完成的时候)。因为设置了acks=-1,只有当这些offsets成功地复制到ISR的所有brokers,才会被提交给offset manager。

4 Offset Fetch实现

消费端在启动时会向broker端请求offset信息,一个Offset请求中包含多个topic-partitions,在consumer客户端中根据缓存的metadata信息区分哪些partition到哪个broker上请求,在返回中会根据不同状态反馈,如当前broker正在加载offset,则返回Loading状态。

对”offsets topic”的某些partition而言,broker状态发生改变,即被当做partition的leader或者follower时,LeaderAndIsrRequest请求会被触发:

  • 如果broker是”offsets topic”中一些partitions的leader, broker会读取指定partition的logs文件,
    并将offsets加载到offset table缓存中.

  1. 任何对这些partition的提交请求仍然会更新offsets表.我们会防止日志文件中过期的offsets覆盖最近的提交请求的offsets.

  2. 被”offsets topic”中partition管理的offset抓取请求的keys直到加载成功之前是不会被使用的.
    broker会返回OffsetLoadingCode的OffsetFetchResponse给消费者客户端.

  • 如果broker是follower: 和其他正常的kafka topic一样,follower会从leader中抓取数据.
    由于follower的offset manager不再负责partitions,它们会在cleanup方法被调用时清理数据.

  • 5 kafka的文件存储

    在Kafka中,消息是按Topic组织的。

    • Partition:topic物理上的分组,一个topic可以划分为多个partition,每个partition是一个有序的队列。

    • Segment:partition物理上由多个segment组成

    • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中每个消息都由一个连续的序列号叫做offset,用于partition唯一标识一条消息。

    ├── data0│   ├── cleaner-offset-checkpoint│   ├── client_mblogduration-35│   │   ├── 00000000000004909731.index│   │   ├── 00000000000004909731.log // 1G文件--Segment│   │   ├── 00000000000005048975.index // 数字是Offset│   │   ├── 00000000000005048975.log│   ├── client_mblogduration-37│   │   ├── 00000000000004955629.index│   │   ├── 00000000000004955629.log│   │   ├── 00000000000005098290.index│   │   ├── 00000000000005098290.log│   ├── __consumer_offsets-33│   │   ├── 00000000000000105157.index│   │   └── 00000000000000105157.log│   ├── meta.properties│   ├── recovery-point-offset-checkpoint│   └── replication-offset-checkpoint
    
    • cleaner-offset-checkpoint:存了每个log的最后清理offset

    • meta.properties: broker.id信息

    • recovery-point-offset-checkpoint:表示已经刷写到磁盘的记录。recoveryPoint以下的数据都是已经刷到磁盘上的了。

    • replication-offset-checkpoint: 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。)

    6 Leader和Follower同步机制

    举例说明,我们假设有一个topic,单分区,副本因子是2,即一个leader副本和一个follower副本。我们看下当producer发送一条消息时,broker端的副本到底会发生什么事情以及分区HW是如何被更新的。

    下图是初始状态,我们稍微解释一下:初始时leader和follower的HW和LEO都是0(严格来说源代码会初始化LEO为-1,不过这不影响之后的讨论)。leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0。此时,producer没有发送任何消息给leader,而follower已经开始不断地给leader发送FETCH请求了,但因为没有数据因此什么都不会发生。值得一提的是,follower发送过来的FETCH请求因为无数据而暂时会被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强制完成。倘若在寄存期间producer端发送过来数据,那么会Kafka会自动唤醒该FETCH请求,让leader继续处理之。

    第一种情况:follower发送FETCH请求在leader处理完PRODUCE请求之后,producer给该topic分区发送了一条消息。此时的状态如下图所示:

    如图所示,leader接收到PRODUCE请求主要做两件事情:

    • 把消息写入写底层log(同时也就自动地更新了leader的LEO)

    • 尝试更新leader HW值(前面leader副本何时更新HW值一节中的第三个条件触发)。我们已经假设此时follower尚未发送FETCH请求,那么leader端保存的remote LEO依然是0,因此leader会比较它自己的LEO值和remote LEO值,发现最小值是0,与当前HW值相同,故不会更新分区HW值

    所以,PRODUCE请求处理完成后leader端的HW值依然是0,而LEO是1,remote LEO是1。假设此时follower发送了FETCH请求(或者说follower早已发送了FETCH请求,只不过在broker的请求队列中排队),那么状态变更如下图所示:

    本例中当follower发送FETCH请求时,leader端的处理依次是:

    • 读取底层log数据

    • 更新remote LEO = 0(为什么是0? 因为此时follower还没有写入这条消息。leader如何确认follower还未写入呢?这是通过follower发来的FETCH请求中的fetch offset来确定的)

    • 尝试更新分区HW——此时leader LEO = 1,remote LEO = 0,故分区HW值= min(leader LEO, follower remote LEO) = 0

    • 把数据和当前分区HW值(依然是0)发送给follower副本

    而follower副本接收到FETCH response后依次执行下列操作:

    • 写入本地log(同时更新follower LEO)

    • 更新follower HW——比较本地LEO和当前leader HW取小者,故follower HW = 0

    此时,第一轮FETCH RPC结束,我们会发现虽然leader和follower都已经在log中保存了这条消息,但分区HW值尚未被更新。实际上,它是在第二轮FETCH RPC中被更新的,如下图所示:

    上图中,follower发来了第二轮FETCH请求,leader端接收到后仍然会依次执行下列操作:

    • 读取底层log数据

    • 更新remote LEO = 1(这次为什么是1了? 因为这轮FETCH RPC携带的fetch offset是1,那么为什么这轮携带的就是1了呢,因为上一轮结束后follower LEO被更新为1了)

    • 尝试更新分区HW——此时leader LEO = 1,remote LEO = 1,故分区HW值= min(leader LEO, follower remote LEO) = 1。注意分区HW值此时被更新了!!!

    • 把数据(实际上没有数据)和当前分区HW值(已更新为1)发送给follower副本

    同样地,follower副本接收到FETCH response后依次执行下列操作:

    • 写入本地log,当然没东西可写,故follower LEO也不会变化,依然是1

    • 更新follower HW——比较本地LEO和当前leader LEO取小者。由于此时两者都是1,故更新follower HW = 1

    producer端发送消息后broker端完整的处理流程就讲完了。此时消息已经成功地被复制到leader和follower的log中且分区HW是1,表明consumer能够消费offset = 0的这条消息。

    以上所有的东西其实就想说明一件事情:Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。它们可能引起的问题包括:

    • 备份数据丢失

    • 备份数据不一致

    这部分这里自己思考下就能够明白,不在赘述。

    References

    • 1.Kafka水位(high watermark)与leader epoch的讨论

    欢迎点赞+收藏+转发朋友圈素质三连

    文章不错?点个【在看】吧! ????

这篇关于Kafka源码阅读最最最简单的入门方法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143138

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu2289(简单二分)

虽说是简单二分,但是我还是wa死了  题意:已知圆台的体积,求高度 首先要知道圆台体积怎么求:设上下底的半径分别为r1,r2,高为h,V = PI*(r1*r1+r1*r2+r2*r2)*h/3 然后以h进行二分 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#includ

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

usaco 1.3 Prime Cryptarithm(简单哈希表暴搜剪枝)

思路: 1. 用一个 hash[ ] 数组存放输入的数字,令 hash[ tmp ]=1 。 2. 一个自定义函数 check( ) ,检查各位是否为输入的数字。 3. 暴搜。第一行数从 100到999,第二行数从 10到99。 4. 剪枝。 代码: /*ID: who jayLANG: C++TASK: crypt1*/#include<stdio.h>bool h

浅谈主机加固,六种有效的主机加固方法

在数字化时代,数据的价值不言而喻,但随之而来的安全威胁也日益严峻。从勒索病毒到内部泄露,企业的数据安全面临着前所未有的挑战。为了应对这些挑战,一种全新的主机加固解决方案应运而生。 MCK主机加固解决方案,采用先进的安全容器中间件技术,构建起一套内核级的纵深立体防护体系。这一体系突破了传统安全防护的局限,即使在管理员权限被恶意利用的情况下,也能确保服务器的安全稳定运行。 普适主机加固措施:

webm怎么转换成mp4?这几种方法超多人在用!

webm怎么转换成mp4?WebM作为一种新兴的视频编码格式,近年来逐渐进入大众视野,其背后承载着诸多优势,但同时也伴随着不容忽视的局限性,首要挑战在于其兼容性边界,尽管WebM已广泛适应于众多网站与软件平台,但在特定应用环境或老旧设备上,其兼容难题依旧凸显,为用户体验带来不便,再者,WebM格式的非普适性也体现在编辑流程上,由于它并非行业内的通用标准,编辑过程中可能会遭遇格式不兼容的障碍,导致操

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

uva 10387 Billiard(简单几何)

题意是一个球从矩形的中点出发,告诉你小球与矩形两条边的碰撞次数与小球回到原点的时间,求小球出发时的角度和小球的速度。 简单的几何问题,小球每与竖边碰撞一次,向右扩展一个相同的矩形;每与横边碰撞一次,向上扩展一个相同的矩形。 可以发现,扩展矩形的路径和在当前矩形中的每一段路径相同,当小球回到出发点时,一条直线的路径刚好经过最后一个扩展矩形的中心点。 最后扩展的路径和横边竖边恰好组成一个直