go-resiliency源码解析之-batcher

2023-10-27 19:59

本文主要是介绍go-resiliency源码解析之-batcher,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

go-resiliency源码解析之-batcher

源代码地址 : https://github.com/eapache/go-resiliency/blob/master/batcher/batcher.go

1.batcher定义

创建一个batch对象需要2个参数:

Timeout:超时,这是一个batch对象收集输入参数的时间。

work函数变量:在timeout超时后,会调用一次work函数,来处理每一个输入参数。

整体处理流程如下图:

请添加图片描述

2.核心源码解析

核心结构定义

type work struct {//收集的一个参数param  interface{}//参数处理返回future chan error
}type Batcher struct {//收集参数的超时时间timeout   time.Duration//过滤器函数prefilter func(interface{}) error//互斥量,用于参数收集并发控制lock   sync.Mutex//存储收集到参数的chansubmit chan *work//批处理函数,超时后,调用该函数一次,处理全部参数//[]interface{}doWork func([]interface{}) errordone   chan bool
}

Run函数

//param是timeout内可收集参数,业务方调用Run函数传入参数
func (b *Batcher) Run(param interface{}) error {//先判断是否有过滤器函数。 prefilter相当于一个数据清洗函数,对无效param参数返回err,这样//在dowork里就不会处理这个输入参数if b.prefilter != nil {if err := b.prefilter(param); err != nil {return err}}//timeout==0表示无收集参数时间,需要立刻执行doWork函数if b.timeout == 0 {return b.doWork([]interface{}{param})}//当timeout > 0 ,就构造一个work对象放入到chan里w := &work{param:  param,future: make(chan error, 1),}b.submitWork(w)return <-w.future
}func (b *Batcher) Prefilter(filter func(interface{}) error) {b.prefilter = filter
}

submitWork函数:在Run函数里,当timeout > 0会调用submitWork函数

func (b *Batcher) submitWork(w *work) {//这里为什么要加一个互斥锁?//对,主要是防止下面if里的代码被并发执行b.lock.Lock()defer b.lock.Unlock()//创建submit的chan, 开启一个batch协程if b.submit == nil {b.done = make(chan bool)b.submit = make(chan *work, 4)go b.batch()}b.submit <- w
}func (b *Batcher) batch() {//params为收集参数集合var params []interface{}var futures []chan errorinput := b.submitgo b.timer()//for读取input这个chan,input在没有close前,这个for不会退出//所以这里就是在等待timeout时间,把输入的参数收集到params这个切片//?? 那input chan什么时候被close了?? 就是	go b.timer()这一句for work := range input {params = append(params, work.param)futures = append(futures, work.future)}//这里就是把收集到的参数传入到你设置的函数,执行业务逻辑ret := b.doWork(params)//把doWork执行结果写回到future,这样调用线程就可以读取到执行结果for _, future := range futures {future <- retclose(future)}close(b.done)
}func (b *Batcher) timer() {//阻塞协程timeout时间,然后调用flush函数time.Sleep(b.timeout)//主要就是关闭submit这个chan,让batch里收集参数for循环退出b.flush()
}func (b *Batcher) flush() {b.lock.Lock()defer b.lock.Unlock()if b.submit == nil {return}close(b.submit)b.submit = nil
}
3.测试用例

这个测试用例实现,在1s内收集传入的整形,然后求和

func TestBatcher(t *testing.T) {wg := &sync.WaitGroup{}b := New(time.Second, func(params []interface{}) error {sum := 0for _, p := range params {sum += p.(int)}t.Logf("sum %d", sum)return nil})b.Prefilter(func(param interface{}) error {// do some sort of sanity check on the parameter, and return an error if it failsreturn nil})for i := 1; i <= 10; i++ {wg.Add(1)go func(param interface{}) {go b.Run(i)wg.Done()}(i)}wg.Wait()time.Sleep(5 * time.Second)
}

这篇关于go-resiliency源码解析之-batcher的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/288030

相关文章

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

使用Jackson进行JSON生成与解析的新手指南

《使用Jackson进行JSON生成与解析的新手指南》这篇文章主要为大家详细介绍了如何使用Jackson进行JSON生成与解析处理,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 核心依赖2. 基础用法2.1 对象转 jsON(序列化)2.2 JSON 转对象(反序列化)3.

Springboot @Autowired和@Resource的区别解析

《Springboot@Autowired和@Resource的区别解析》@Resource是JDK提供的注解,只是Spring在实现上提供了这个注解的功能支持,本文给大家介绍Springboot@... 目录【一】定义【1】@Autowired【2】@Resource【二】区别【1】包含的属性不同【2】@

SpringCloud动态配置注解@RefreshScope与@Component的深度解析

《SpringCloud动态配置注解@RefreshScope与@Component的深度解析》在现代微服务架构中,动态配置管理是一个关键需求,本文将为大家介绍SpringCloud中相关的注解@Re... 目录引言1. @RefreshScope 的作用与原理1.1 什么是 @RefreshScope1.

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

go中空接口的具体使用

《go中空接口的具体使用》空接口是一种特殊的接口类型,它不包含任何方法,本文主要介绍了go中空接口的具体使用,具有一定的参考价值,感兴趣的可以了解一下... 目录接口-空接口1. 什么是空接口?2. 如何使用空接口?第一,第二,第三,3. 空接口几个要注意的坑坑1:坑2:坑3:接口-空接口1. 什么是空接

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

Java的IO模型、Netty原理解析

《Java的IO模型、Netty原理解析》Java的I/O是以流的方式进行数据输入输出的,Java的类库涉及很多领域的IO内容:标准的输入输出,文件的操作、网络上的数据传输流、字符串流、对象流等,这篇... 目录1.什么是IO2.同步与异步、阻塞与非阻塞3.三种IO模型BIO(blocking I/O)NI

Python 中的异步与同步深度解析(实践记录)

《Python中的异步与同步深度解析(实践记录)》在Python编程世界里,异步和同步的概念是理解程序执行流程和性能优化的关键,这篇文章将带你深入了解它们的差异,以及阻塞和非阻塞的特性,同时通过实际... 目录python中的异步与同步:深度解析与实践异步与同步的定义异步同步阻塞与非阻塞的概念阻塞非阻塞同步

利用Go语言开发文件操作工具轻松处理所有文件

《利用Go语言开发文件操作工具轻松处理所有文件》在后端开发中,文件操作是一个非常常见但又容易出错的场景,本文小编要向大家介绍一个强大的Go语言文件操作工具库,它能帮你轻松处理各种文件操作场景... 目录为什么需要这个工具?核心功能详解1. 文件/目录存javascript在性检查2. 批量创建目录3. 文件