go-resiliency源码解析之-batcher

2023-10-27 19:59

本文主要是介绍go-resiliency源码解析之-batcher,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

go-resiliency源码解析之-batcher

源代码地址 : https://github.com/eapache/go-resiliency/blob/master/batcher/batcher.go

1.batcher定义

创建一个batch对象需要2个参数:

Timeout:超时,这是一个batch对象收集输入参数的时间。

work函数变量:在timeout超时后,会调用一次work函数,来处理每一个输入参数。

整体处理流程如下图:

请添加图片描述

2.核心源码解析

核心结构定义

type work struct {//收集的一个参数param  interface{}//参数处理返回future chan error
}type Batcher struct {//收集参数的超时时间timeout   time.Duration//过滤器函数prefilter func(interface{}) error//互斥量,用于参数收集并发控制lock   sync.Mutex//存储收集到参数的chansubmit chan *work//批处理函数,超时后,调用该函数一次,处理全部参数//[]interface{}doWork func([]interface{}) errordone   chan bool
}

Run函数

//param是timeout内可收集参数,业务方调用Run函数传入参数
func (b *Batcher) Run(param interface{}) error {//先判断是否有过滤器函数。 prefilter相当于一个数据清洗函数,对无效param参数返回err,这样//在dowork里就不会处理这个输入参数if b.prefilter != nil {if err := b.prefilter(param); err != nil {return err}}//timeout==0表示无收集参数时间,需要立刻执行doWork函数if b.timeout == 0 {return b.doWork([]interface{}{param})}//当timeout > 0 ,就构造一个work对象放入到chan里w := &work{param:  param,future: make(chan error, 1),}b.submitWork(w)return <-w.future
}func (b *Batcher) Prefilter(filter func(interface{}) error) {b.prefilter = filter
}

submitWork函数:在Run函数里,当timeout > 0会调用submitWork函数

func (b *Batcher) submitWork(w *work) {//这里为什么要加一个互斥锁?//对,主要是防止下面if里的代码被并发执行b.lock.Lock()defer b.lock.Unlock()//创建submit的chan, 开启一个batch协程if b.submit == nil {b.done = make(chan bool)b.submit = make(chan *work, 4)go b.batch()}b.submit <- w
}func (b *Batcher) batch() {//params为收集参数集合var params []interface{}var futures []chan errorinput := b.submitgo b.timer()//for读取input这个chan,input在没有close前,这个for不会退出//所以这里就是在等待timeout时间,把输入的参数收集到params这个切片//?? 那input chan什么时候被close了?? 就是	go b.timer()这一句for work := range input {params = append(params, work.param)futures = append(futures, work.future)}//这里就是把收集到的参数传入到你设置的函数,执行业务逻辑ret := b.doWork(params)//把doWork执行结果写回到future,这样调用线程就可以读取到执行结果for _, future := range futures {future <- retclose(future)}close(b.done)
}func (b *Batcher) timer() {//阻塞协程timeout时间,然后调用flush函数time.Sleep(b.timeout)//主要就是关闭submit这个chan,让batch里收集参数for循环退出b.flush()
}func (b *Batcher) flush() {b.lock.Lock()defer b.lock.Unlock()if b.submit == nil {return}close(b.submit)b.submit = nil
}
3.测试用例

这个测试用例实现,在1s内收集传入的整形,然后求和

func TestBatcher(t *testing.T) {wg := &sync.WaitGroup{}b := New(time.Second, func(params []interface{}) error {sum := 0for _, p := range params {sum += p.(int)}t.Logf("sum %d", sum)return nil})b.Prefilter(func(param interface{}) error {// do some sort of sanity check on the parameter, and return an error if it failsreturn nil})for i := 1; i <= 10; i++ {wg.Add(1)go func(param interface{}) {go b.Run(i)wg.Done()}(i)}wg.Wait()time.Sleep(5 * time.Second)
}

这篇关于go-resiliency源码解析之-batcher的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/288030

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Go Playground 在线编程环境

For all examples in this and the next chapter, we will use Go Playground. Go Playground represents a web service that can run programs written in Go. It can be opened in a web browser using the follow

OWASP十大安全漏洞解析

OWASP(开放式Web应用程序安全项目)发布的“十大安全漏洞”列表是Web应用程序安全领域的权威指南,它总结了Web应用程序中最常见、最危险的安全隐患。以下是对OWASP十大安全漏洞的详细解析: 1. 注入漏洞(Injection) 描述:攻击者通过在应用程序的输入数据中插入恶意代码,从而控制应用程序的行为。常见的注入类型包括SQL注入、OS命令注入、LDAP注入等。 影响:可能导致数据泄

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。