本文主要是介绍go-resiliency源码解析之-batcher,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
go-resiliency源码解析之-batcher
源代码地址 : https://github.com/eapache/go-resiliency/blob/master/batcher/batcher.go
1.batcher定义
创建一个batch对象需要2个参数:
Timeout:超时,这是一个batch对象收集输入参数的时间。
work函数变量:在timeout超时后,会调用一次work函数,来处理每一个输入参数。
整体处理流程如下图:
2.核心源码解析
核心结构定义
type work struct {//收集的一个参数param interface{}//参数处理返回future chan error
}type Batcher struct {//收集参数的超时时间timeout time.Duration//过滤器函数prefilter func(interface{}) error//互斥量,用于参数收集并发控制lock sync.Mutex//存储收集到参数的chansubmit chan *work//批处理函数,超时后,调用该函数一次,处理全部参数//[]interface{}doWork func([]interface{}) errordone chan bool
}
Run函数
//param是timeout内可收集参数,业务方调用Run函数传入参数
func (b *Batcher) Run(param interface{}) error {//先判断是否有过滤器函数。 prefilter相当于一个数据清洗函数,对无效param参数返回err,这样//在dowork里就不会处理这个输入参数if b.prefilter != nil {if err := b.prefilter(param); err != nil {return err}}//timeout==0表示无收集参数时间,需要立刻执行doWork函数if b.timeout == 0 {return b.doWork([]interface{}{param})}//当timeout > 0 ,就构造一个work对象放入到chan里w := &work{param: param,future: make(chan error, 1),}b.submitWork(w)return <-w.future
}func (b *Batcher) Prefilter(filter func(interface{}) error) {b.prefilter = filter
}
submitWork函数:在Run函数里,当timeout > 0会调用submitWork函数
func (b *Batcher) submitWork(w *work) {//这里为什么要加一个互斥锁?//对,主要是防止下面if里的代码被并发执行b.lock.Lock()defer b.lock.Unlock()//创建submit的chan, 开启一个batch协程if b.submit == nil {b.done = make(chan bool)b.submit = make(chan *work, 4)go b.batch()}b.submit <- w
}func (b *Batcher) batch() {//params为收集参数集合var params []interface{}var futures []chan errorinput := b.submitgo b.timer()//for读取input这个chan,input在没有close前,这个for不会退出//所以这里就是在等待timeout时间,把输入的参数收集到params这个切片//?? 那input chan什么时候被close了?? 就是 go b.timer()这一句for work := range input {params = append(params, work.param)futures = append(futures, work.future)}//这里就是把收集到的参数传入到你设置的函数,执行业务逻辑ret := b.doWork(params)//把doWork执行结果写回到future,这样调用线程就可以读取到执行结果for _, future := range futures {future <- retclose(future)}close(b.done)
}func (b *Batcher) timer() {//阻塞协程timeout时间,然后调用flush函数time.Sleep(b.timeout)//主要就是关闭submit这个chan,让batch里收集参数for循环退出b.flush()
}func (b *Batcher) flush() {b.lock.Lock()defer b.lock.Unlock()if b.submit == nil {return}close(b.submit)b.submit = nil
}
3.测试用例
这个测试用例实现,在1s内收集传入的整形,然后求和
func TestBatcher(t *testing.T) {wg := &sync.WaitGroup{}b := New(time.Second, func(params []interface{}) error {sum := 0for _, p := range params {sum += p.(int)}t.Logf("sum %d", sum)return nil})b.Prefilter(func(param interface{}) error {// do some sort of sanity check on the parameter, and return an error if it failsreturn nil})for i := 1; i <= 10; i++ {wg.Add(1)go func(param interface{}) {go b.Run(i)wg.Done()}(i)}wg.Wait()time.Sleep(5 * time.Second)
}
这篇关于go-resiliency源码解析之-batcher的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!