本文主要是介绍client-go——WorkQueue源码分析(八),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
本文介绍对WorkQueue组件进行分析说明。
1. 介绍
前面我们把Informer的整个流程完整分析了一遍,我们可以通过监听对象的变化,将资源对象写入到事件处理器的回调函数中,但是如果我们直接在回调函数中处理这些数据会比较慢,对于这种情况往往我们就会使用队列来直接接收这些数据,然后再通过其他协程去处理这些数据,可以大大加快数据的处理速度。这个其实和channel有点类似,但是channel功能过于简单,无法满足各类场景的需求,比如限制数据队列的写入速度。
为此client-go中单独提供了一个workqueue的组件来实现队列的功能,由于kubernetes很多模块都有队列的需求,所以统一实现在了client-go中,不仅可以用于kubernetes内部,同时也可以供调用client-go的模块使用。client-go中抽象了几种队列,包括通用队列、限速队列、延时队列等等。
2. 通用队列
首先我们来看client-go中队列是如何定义的,代码位于k8s.io/client-go/util/workqueue/queue.go
文件中:
// k8s.io/client-go/util/workqueue/queue.go
type Interface interface {Add(item interface{}) //向队列中添加一个元素Len() int //获取队列长度Get() (item interface{},shutdown bool) //获取队列头部的元素,第二个返回值表示队列是否已经关闭Done(item interface{}) // 标记队列中元素是否已经处理完ShutDown() //关闭队列ShuttingDown() bool //查询队列是否正在关闭
}
既然是接口,那么肯定就有实现,在当前文件中有一个名Type
的结构体,就实现了上面的队列的接口:
// k8s.io/client-go/util/workqueue/queue.go
type Type struct {// queue定义了我们处理对象的顺序。队列的中每个元素都应该在dirty set集合中,而不是在processing set中queue []t// dirty 定义了需要处理的所有元素dirty set// 当前正在处理的所有元素都在processing set中。这些元素可能同时在dirty set中。当我们完成某些处理的元素后并将其从此set种移除。我们将会检查它是否在dirty set中,如果在,会将它假如到queue中。processing setcond *sync.Cond// 队列关闭的标识符shuttingDown bool// 保存promethues监控的指标数据metrics queueMetricsunfinishedWorkUpdatePeriod time.Durationclock clock.Clock
}// 添加正在处理的标识符
func (q *Type) Add(item interface{}) {q.cond.L.Lock()defer q.cond.L.Unlock()// r如果队列正在关闭,那么直接返回if q.shuttingDown {return}// map作为辅助,为o(1)的事件复杂度,判断元素是否在脏数据集合,如果在,那么也直接返回if q.dirty.has(item) {return}q.metrics.add(item)// 添加到脏数据集合中q.dirty.insert(item)// 数据已经在正在处理的集合中了,那么也直接返回if q.processing.has(item) {return}// 添加到元素队列的尾部q.queue = append(q.queue, item)// 通知其他协程解除阻塞q.cond.Signal()
}
上面的添加元素的函数并不是直接将元素存储在queue集合中,而是先添加到dirty集合中,这个地方的dirty集合主要用来判断元素是否已经存在了,我们直接通过queue集合当然也可以判断但是需要遍历一次,效率太低,而dirty是一个set集合(用map实现的),用来判断是否存在肯定是最快的方式,所以如果数据在脏数据的集合中那么就不处理了。如果没在脏数据集合中那么就添加进去,还有一种情况是我们添加的这个元素正在处理,但还是还没有调用Done()函数,也就是这个元素正在处理,此时再添加当前的元素应该是最新的,处理中的是过时的,也就是脏的。
接下来看看Get函数的实现:
// k8s.io/client-go/util/workqueue/queue.go
**// 弹出queue中头部元素放入processing队列,从dirty中移除
func (q *Type) Get() (item interface{}, shutdown bool) {q.cond.L.Lock()defer q.cond.L.Unlock()for len(q.queue) == 0 && !q.shuttingDown {q.cond.Wait()}// 协程被激活,但是没有数据,则返回关闭状态if len(q.queue) == 0 {return nil, true}// 弹出第一个元素item, q.queue = q.queue[0], q.queue[1:]q.metrics.get(item)// 将当前元素加入到正在处理的集合中q.processing.insert(item)// 从脏数据集合中移除q.dirty.delete(item)return item, false
}**
Get()函数就是从真正的队列queue中取出第一个元素,取出来要放到正在处理的集合中,并从脏数据集合中移除。然后就是Done,用来标记元素处理完成了。
// 标记item已经被处理,如果当它正在被处理时,又被标记为dirty,它将会被重新放进processing队列
func (q *Type) Done(item interface{}) {q.cond.L.Lock()defer q.cond.L.Unlock()q.metrics.done(item)// 从正在处理的集合中移除,因为已经处理完了q.processing.delete(item)// 判断脏数据集合中是否有这个元素,因为在处理的期间很可能元素又被添加了,那么需要重新放到队列中去进行处理if q.dirty.has(item) {// 放回队列q.queue = append(q.queue, item)// 激活协程q.cond.Signal()}
}
Done()函数的作用就是标记元素为已经处理完成了,只是在处理元素的期间很有可能该元素又被添加进来,出现在了脏数据集合中,那么就需要重新返回数据队列进行处理。其他的函数
// 返回当前队列中元素的长度
func (q *Type) Len() int {q.cond.L.Lock()defer q.cond.L.Unlock()return len(q.queue)
}// 关闭队列,shutdown会使队列忽略所有新添加的元素,一旦工作协程排空里队列中现有元素,它们就会被标记为退出
func (q *Type) ShutDown() {q.cond.L.Lock()defer q.cond.L.Unlock()q.shuttingDown = trueq.cond.Broadcast()
}// 队列是否在关闭
func (q *Type) ShuttingDown() bool {q.cond.L.Lock()defer q.cond.L.Unlock()return q.shuttingDown
}
通过对上面的队列实现分析,可以用下图来进行说明:
通过Add()往队列中分别插入1、2、3这三个元素,此时队列中的queue和dirty分别存有1、2、3元素,processing为空。然后通过Get()获取最先进入的元素1,此时队列中queue和dirty字段分别存有2、3元素,而1会被放入processing队列,表示该元素正在被处理。最后,当我们处理1元素时,通过Done()标记该元素已经被处理完成,此时队列中的processing队列中1元素被删除。然后用同样的方式继续处理元素2、3。
如果在处理元素1的过程中,又有其他的协程将该元素又加入到队列中,处理流程如下图所示:
假设goroutine A通过Get()获取1元素时,1元素被添加到processing队列中,同一时间,goroutine B通过Add()插入另一个1元素,此时在processing中已经存在相同元素,所以后面的1并不会被直接添加到queue中,当前队列中的dirty中存有1、2、3元素,processing字段存有1元素。在goroutine A通过Done()标记处理完成后,如果dirty中还有1元素,则将1元素追加到queue字段中尾部。
3. 延时队列
延迟队列是在通用队列基础上进行扩展的,因为它本质上还是一个队列,只是增加了一个新的函数来控制延迟,对应的接口定义如下图所示:
//staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go
// DelayingInterface是一个延时队列,可以在以后的时间来添加元素的接口,这使得它更容易在处理失败后重新加入队列,而不至于陷入hot-loop
type DelayingInterface interface {Interface// 在指定的时间后将元素添加到工作队列中AddAfter(item interface{}, duration time.Duration)
}
延迟队列的定义简单,就是增加了一个函数来实现元素的延迟增加而已,接下来我吗继续来查看改接口的具体实现:
//staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go// deployingType包装了Interface通用接口,并提供了延迟重新入队列
type delayingType struct {Interface // 一个通用队列// 时钟用于跟踪延迟触发的时间clock clock.Clock// stopCh lets us signal a shutdown to the waiting loop// the waiting loop的关闭信号stopCh chan struct{}// 用来保证只发出一次关闭信号stopOnce sync.Once// 在触发前确保我吗等待的时间不超过maxWaitheartbeat clock.Ticker// waitingForAddch是一个buffered channel,提供了一个缓冲通道,延迟添加的元素封装成waitFor放到channel中waitingForAddCh chan *waitFor// 记录重试的次数metrics retryMetrics
}
// waitFor持有要增加的数据和应该增加时间
type waitFor struct {// 要添加的元素数据data t// 添加的时间点readyAt time.Time// index in the priority queue (heap)// 优先级队列的索引index int
}
在延时队列的实现DelayingType结构体中包含一个通用队列Interface的实现,然后最重要的一个属性就是waitForAddCh
,这是一个buffered channel,将延迟添加的元素封装成waitFor
放到通道中,即当到了指定的时间后就将元素添加到通用队列中去进行处理,还没有到时间的话就放到这个缓冲通道中。要了解延迟队列如何实现,还需要了解waitForPriorityQueue
:
//staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go//waitingForPriorityQueue为waitFor的元素集合实现了一个优先级队列。
//把需要延迟的元素放到一个队列中,然后在队列中按照元素的延时添加事件(readAt)从小到大排序。
type waitForPriorityQueue []*waitFor
waitForPriorityQueue是一个有序的waitFor的集合,按照添加的事件从小到大进行排列,这就形成了一个优先级队列
3.2 优先级队列
其实这个队列是golang中内置的container/heap/heap.go文件中的Interface接口(数据结构中的堆)的一个实现,我们要想实现自己的队列也完全可以去实现这个接口即可:
container/heap/heap.go
type Interface interface {sort.InterfacePush(x interface{}) // add x as element Len()Pop() interface{} // remove and return element Len() - 1.
}// Init establishes the heap invariants required by the other routines in this package.
// Init is idempotent with respect to the heap invariants
// and may be called whenever the heap invariants may have been invalidated.
// The complexity is O(n) where n = h.Len().
func Init(h Interface) {// heapifyn := h.Len()for i := n/2 - 1; i >= 0; i-- {down(h, i, n)}
}// Push pushes the element x onto the heap.
// The complexity is O(log n) where n = h.Len().
func Push(h Interface, x interface{}) {h.Push(x)up(h, h.Len()-1)
}// Pop removes and returns the minimum element (according to Less) from the heap.
// The complexity is O(log n) where n = h.Len().
// Pop is equivalent to Remove(h, 0).
func Pop(h Interface) interface{} {n := h.Len() - 1h.Swap(0, n)down(h, 0, n)return h.Pop()
}// Remove removes and returns the element at index i from the heap.
// The complexity is O(log n) where n = h.Len().
func Remove(h Interface, i int) interface{} {n := h.Len() - 1if n != i {h.Swap(i, n)if !down(h, i, n) {up(h, i)}}return h.Pop()
}// Fix re-establishes the heap ordering after the element at index i has changed its value.
// Changing the value of the element at index i and then calling Fix is equivalent to,
// but less expensive than, calling Remove(h, i) followed by a Push of the new value.
// The complexity is O(log n) where n = h.Len().
func Fix(h Interface, i int) {if !down(h, i, h.Len()) {up(h, i)}
}func up(h Interface, j int) {for {i := (j - 1) / 2 // parentif i == j || !h.Less(j, i) {break}h.Swap(i, j)j = i}
}func down(h Interface, i0, n int) bool {i := i0for {j1 := 2*i + 1if j1 >= n || j1 < 0 { // j1 < 0 after int overflowbreak}j := j1 // left childif j2 := j1 + 1; j2 < n && h.Less(j2, j1) {j = j2 // = 2*i + 2 // right child}if !h.Less(j, i) {break}h.Swap(i, j) i = j}return i > i0
}
3.3 延时队列的实现
延迟队列的实现
// **staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go
// 获取延迟长度,**
func (pq waitForPriorityQueue) Len() int {return len(pq)
}
// 判断索引i和j上元素的大小
func (pq waitForPriorityQueue) Less(i, j int) bool {
// 根据时间先后顺序来决定先后顺序,i位置的元素在j之前,则证明索引i元素小于索引j的元素return pq[i].readyAt.Before(pq[j].readyAt)
}// 交换i和j的元素
func (pq waitForPriorityQueue) Swap(i, j int) {pq[i], pq[j] = pq[j], pq[i]
// 更新元素里面的索引信息pq[i].index = ipq[j].index = j
}// 添加元素到队列中,注意:不应该直接调用push函数,而应该使用heap.Push
func (pq *waitForPriorityQueue) Push(x interface{}) {n := len(*pq)item := x.(*waitFor)item.index = n*pq = append(*pq, item)
}// 从队列中移除元素,注意:不应该直接调用Pop函数,而应该使用heap.
func (pq *waitForPriorityQueue) Pop() interface{} {n := len(*pq)item := (*pq)[n-1]item.index = -1*pq = (*pq)[0:(n - 1)]return item
}// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
// Peek返回队列的头部元素,不会移除这个元素或以其他方式改变这个队列。直接调用是安全的
func (pq waitForPriorityQueue) Peek() interface{} {return pq[0]
}
上面就是waitForQueue这个优先级队列的实现,下面主要分析延迟队列,延迟队列继承通用队列,在这里只分析新增部分
// **staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go**
// 在指定的延迟时间之后将元素item添加到队列中
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {// don't add if we're already shutting down// 如果队列关闭则直接退出if q.ShuttingDown() {return}q.metrics.retry()// 如果延迟时间<=0,则相当于通用队列一样增加元素if duration <= 0 {q.Add(item)return}// select没有default case,可能会阻塞select {// 如果调用shutDown,则解除阻塞case <-q.stopCh:// 把元素封装成waitFor传给waitForAddchcase q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:}
}
AddAfter函数实现比较简单,就是把元素和添加的时间封装成一个waitFor对象,然后发送给waitForAddCh通道,具体怎么添加元素查看这个waitingLoop(),这个函数在实例化DelayingInterface
后就用一个单独协程启动了:
// **staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go
//**
func (q *delayingType) waitingLoop() {defer utilruntime.HandleCrash()// Make a placeholder channel to use when there are no items in our listnever := make(<-chan time.Time)// Make a timer that expires when the item at the head of the waiting queue is readyvar nextReadyAtTimer clock.Timer// 构造优先级队列waitingForQueue := &waitForPriorityQueue{}// 构造最小堆heap.Init(waitingForQueue)// 用来避免元素重复添加,如果重复添加了就只更新时间waitingEntryByData := map[t]*waitFor{}for {// 如果队列关闭直接返回if q.Interface.ShuttingDown() {return}// 获取当前时间now := q.clock.Now()// 如果优先级队列中有元素for waitingForQueue.Len() > 0 {// 获取第一个元素entry := waitingForQueue.Peek().(*waitFor)// 如果第一个元素指定时间还没到,则跳出循环,因为第一个元素的时间是最小的if entry.readyAt.After(now) {break}// 如果时间过了,那就把它从优先级队列中取出放到通用队列中,同时要把元素从上面提到的map中删除,因为不用再判断重复添加了entry = heap.Pop(waitingForQueue).(*waitFor)q.Add(entry.data)delete(waitingEntryByData, entry.data)}nextReadyAt := never// 如果优先级队列中还有元素,那就用第一个元素指定的时间减去当前时间作为等待时间if waitingForQueue.Len() > 0 {if nextReadyAtTimer != nil {nextReadyAtTimer.Stop()}entry := waitingForQueue.Peek().(*waitFor)nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))nextReadyAt = nextReadyAtTimer.C()}select {// 退出信号case <-q.stopCh:return// 定时器,每过一段时间没有任何数据,那就再执行一次大循环case <-q.heartbeat.C():// 上面的等待时间信号,时间到了就有信号,激活这个case,然后继续循环,添加准备好了的元素case <-nextReadyAt:// AddAfter()中放入到通道中的元素,这里从通道中获取数据case waitEntry := <-q.waitingForAddCh:// 如果时间过了就直接插入通用队列,没过就插入到有序队列if waitEntry.readyAt.After(q.clock.Now()) {insert(waitingForQueue, waitingEntryByData, waitEntry)} else {// 插入到通用队列q.Add(waitEntry.data)}// 把channel里面的元素全部取出来,如果没有元素直接退出drained := falsefor !drained {select {case waitEntry := <-q.waitingForAddCh:if waitEntry.readyAt.After(q.clock.Now()) {insert(waitingForQueue, waitingEntryByData, waitEntry)} else {q.Add(waitEntry.data)}default:drained = true}}}}
}// 插入元素到优先级队列,或者元素已经存在则更新指定时间
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {// if the entry already exists, update the time only if it would cause the item to be queued sooner// 元素存在,比较谁时间靠后就用谁的时间existing, exists := knownEntries[entry.data]if exists {if existing.readyAt.After(entry.readyAt) {existing.readyAt = entry.readyAt// 时间变了需要重新调整优先级队列heap.Fix(q, existing.index)}return}// 把元素放入有序队列中heap.Push(q, entry)// 并记录在上面的map中,用于判断是否存在knownEntries[entry.data] = entry
}
延时队列实现原理就是延时队列里面保存着元素执行的时间,根据这个时间先后顺序来构造一个优先级队列,时间到了就把这个元素添加到通用队列正常处理。这里的优先级队列使用golang内置的heap接口实现。
4. 限速队列
限速队列是扩展的延迟队列,在其基础上增加了AddRateLimited、Forget、NumberRequests三个方法:
// staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go
// 限速队列接口是对加入队列元素进行速率限制的接口
**type RateLimitingInterface interface {// 延迟队列DelayingInterface// 在限速器说ok后,将元素item添加到工作队列中AddRateLimited(item interface{})// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you// still have to call `Done` on the queue.// 丢弃指定的元素Forget(item interface{})//查询元素放入队列的次数NumRequeues(item interface{}) int
}**
限速队列的数据结构:
// staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go
type rateLimitingType struct {DelayingInterface// 限速器rateLimiter RateLimiter
}// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}func (q *rateLimitingType) NumRequeues(item interface{}) int {return q.rateLimiter.NumRequeues(item)
}func (q *rateLimitingType) Forget(item interface{}) {q.rateLimiter.Forget(item)
}
4.1 限速器
限速器的定义:
type RateLimiter interface {// 获取指定元素等待时间When(item interface{}) time.Durationt// 释放指定元素,表示该元素已经被处理过Forget(item interface{}) **// 返回某个对象对象被重新入队多少次,监控用**NumRequeues(item interface{}) int
}// 生成一个默认的限速器
func DefaultControllerRateLimiter() RateLimiter {// return NewMaxOfRateLimiter(NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)// 令牌桶限速器&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},)
}
4.1.1 令牌桶限速器(BucketRateLimiter)
令牌桶限速器是一个固定速率的限速器,该限速器是利用golang.org/x/time/rate库实现,令牌桶算法内部实现了一个存放token的桶,初始化时桶是空的,token会以固定速率往桶里面填充,知道将其填满为止,多余的token会被丢弃。每个元素都会从令牌桶得到一个token,只有得到token的元素才允许通过,而没有得到token的元素处于等待状态。令牌桶算法通过控制发放token来达到限速的目的。
令牌桶是有一个固定大小的桶,系统会以恒定的速度向桶中放入token,桶满了就暂时不放了,而用户则从桶中取了token,如果有剩余的token就可以一直取,如果没有剩余的token,则需要等到系统中放置了Token才行。
golang中就自带了一个令牌桶限速器的实现,我们可以使用以下的方法构造一个限速器对象:
limiter := NewLimiter(10, 1)
func NewLimiter(r Limit,b int) *Limiter {return &Limiter{limit: r, // 代表每秒可以向Token桶中产生多少token,Limit实际上是float的别名burst: b, // 代表token桶容量大小
}
}
除了直接指定每秒产生的token个数外,还可以用Every()
来指定向Token中放置Token的间隔:
limit := Every(1000*time.Millisecond)limiter := NewLimiter(limit, 1)// 每隔100ms往桶中放一个Token,即1秒产生10个。
Limiter提供了三类消费token的方法,用户可以每次消费一个token,也可以一次消费多个token,当token不足时每种方法有不同的处理逻辑。
- Wait/WaitN
**func (lim *Limiter) Wait(ctx context.Context) (err error)**
**func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)**
Wait实际上就是WaitN(ctx,1)。当使用Wait()消费token时,如果此时桶内token不足时(小于N),那么Wait()将会阻塞一段时间,直至token满足条件,当然如果充足则直接反汇编。Wait()有一个context参数,可以设置context的Deadline或者Timeout,来决定此次Wait的最长时间。
- Allow/AllowN
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
Allow实际上就是AllowN(time.Now(),1)。AllowN()表示截止到某一时刻,目前桶中数目是否至少为n个,满足则返回true,同时从桶中消费n个token。反之返回不消费token,false。通常对应线上场景,如果请求速率过快,则直接丢掉某些请求。
- Reserve/ReserveN
func (lim *Limiter) Reserve() *Reservation
**func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation**
Reserve相当于ReserveN(time.Now(),1)。ReserveN()即当调用完成后,无论token是否充足,都会返回一个Reservation对象指针。可以调用该对象的Delay()方法,该方法返回了需要等待的事件,如果等待时间为0,则说明不用等待。必须等到等待时间后,才能进行接下来的工作。或者不等待,可以直接调用Cencel(),该方法会将token归还。
动态调整速率
Limiter支持调整速率和桶大小:
SetLimit(Limit) // 改变放入token的速率
SetBurst(int) //改变token桶大小
基于上述两个方法可以根据我们的虚需求进行动态改变token桶的大小和速率。
令牌桶限速器的实现
// BucketRateLimiter使用了标准的一个令牌桶实现了限速队列的API
type BucketRateLimiter struct {*rate.Limiter
}var _ RateLimiter = &BucketRateLimiter{}func (r *BucketRateLimiter) When(item interface{}) time.Duration {// 获取需要等待的事件(延迟),而且这个延迟是一个相对固定的周期return r.Limiter.Reserve().Delay()
}func (r *BucketRateLimiter) NumRequeues(item interface{}) int {// 固定频率,不需要重试return 0
}func (r *BucketRateLimiter) Forget(item interface{}) {// 不需要重试,因此也不需要忘记
}
令牌桶限速器直接包装一个令牌桶Limiter对象,直接通过Limiter.Reserve().Delay()就可以获取元素需要延迟的时间,再使用这个限速器的时候,默认初始化参数为:BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10)**,** 100)
通过rate.NewLimiter实例化,传入r和b两个参数,r表示每秒往桶中填充token的数量,b表示令牌桶的大小,默认的参数为速率为10,即每秒放入10个bucket,桶容量大小为100。比如在一个限速周期内插入了1000个元素,通过r.Limiter.Reserve.Delay()返回指定元素应该等待的时间,前100(b)个元素会立即被处理,而后面的元素的延迟时间分别为item100/100ms、item101/200ms、item102/300ms。
ItemExponentialFailureRateLimiter
ItemExponentialFailureRateLimiter是增长限速器,即元素错误次数指数递增限速器,它会根据元素错误次数逐渐累加等待时间。具体实现如下:
// 当对象处理失败的时候,其再次入队的等待时间x2,到maxDelay为止,直到超过最大失败次数
type ItemExponentialFailureRateLimiter struct {// 修改失败次数用的锁failuresLock sync.Mutex// 记录每个元素失败的次数failures map[interface{}]int// 元素延迟基数baseDelay time.Duration// 元素最大的延迟时间maxDelay time.Duration
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()defer r.failuresLock.Unlock()// 累加错误计数exp := r.failures[item]r.failures[item] = r.failures[item] + 1// 通过错误次数计算延迟时间,2^i*baseDelay,按照指数增长// The backoff is capped such that 'calculated' value never overflows.backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))if backoff > math.MaxInt64 {// 若计算延迟时间大于MaxInt64,直接返回最大延迟时间return r.maxDelay}// 若计算延迟时间大于maxDelay,直接返回最大延迟时间calculated := time.Duration(backoff)if calculated > r.maxDelay {return r.maxDelay}return calculated
}// 元素错误次数,直接从Failures中获取
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {r.failuresLock.Lock()defer r.failuresLock.Unlock()return r.failures[item]
}
// 直接从failures中删除指定元素
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {r.failuresLock.Lock()defer r.failuresLock.Unlock()delete(r.failures, item)
}
从上面代码分析可以知道该限速器是出现错误后不断尝试的过程,而且随着尝试次数的增加按照指数增加延迟时间。
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter和ItemExponentialFailureRateLimiter都是用于错误尝试的的,但是ItemFastSlowRateLimiter的限速策略是尝试次数超过阈值用长延迟,否则用短延迟,该限速器很少用
type ItemFastSlowRateLimiter struct {failuresLock sync.Mutexfailures map[interface{}]int// 快慢限速器,先以fastDelay为周期进行尝试,超过maxFastAttempts次数后,按照slowDelay为周期尝试maxFastAttempts intfastDelay time.DurationslowDelay time.Duration
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()defer r.failuresLock.Unlock()r.failures[item] = r.failures[item] + 1if r.failures[item] <= r.maxFastAttempts {return r.fastDelay}return r.slowDelay
}func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {r.failuresLock.Lock()defer r.failuresLock.Unlock()return r.failures[item]
}func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {r.failuresLock.Lock()defer r.failuresLock.Unlock()delete(r.failures, item)
}
MaxOfRateLimiter
MaxOfRateLimiter混合限速器,它内部有多个限速器,选择所有限速器中速度最慢,即延迟最大的一种方案。比如内部有三个限速器,When()返回三个限速器里面延迟最大的。
type MaxOfRateLimiter struct {// 限速器数组limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {ret := time.Duration(0)// 获取所有限速器里面时间最大的延迟时间for _, limiter := range r.limiters {curr := limiter.When(item)if curr > ret {ret = curr}}return ret
}func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {ret := 0// Requests次数也是取最大值for _, limiter := range r.limiters {curr := limiter.NumRequeues(item)if curr > ret {ret = curr}}return ret
}func (r *MaxOfRateLimiter) Forget(item interface{}) {for _, limiter := range r.limiters {limiter.Forget(item)}
}
这篇关于client-go——WorkQueue源码分析(八)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!