go 协程池 ants库分析

2024-03-06 09:32
文章标签 分析 go ants 协程池

本文主要是介绍go 协程池 ants库分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

相比于创建多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。但是受限于资源总量,系统中能够创建的 goroutine 数量也是受限的。默认每个 goroutine 占用 8KB 内存,一台 8GB 内存的机器满打满算也只能创建 8GB/8KB = 1000000 个 goroutine,更何况系统还需要保留一部分内存运行日常管理任务,go 运行时需要内存运行 gc、处理 goroutine 切换等。使用的内存超过机器内存容量,系统会使用交换区(swap),导致性能急速下降,甚至直接error

另一方面,goroutine 的管理也是一个问题。goroutine 只能自己运行结束,外部没有任何手段可以强制j结束一个 goroutine。如果一个 goroutine 因为某种原因没有自行结束,就会出现 goroutine 泄露。此外,频繁创建 goroutine 也是一个开销。

所以,我们就会需要一个goroutine池,自动管理goroutine生命周期,可以按需创建,动态缩容,自动安排任务的执行

以上内容摘自Go 每日一库之 ants

使用

使用上可以参考 Go 每日一库之 ants 里面给的例子,是一个计算大量整数和的程序。如下


// 包装任务需要执行的内容
// ants支持将一个不接受任何参数的函数作为任务提交给 goroutine 运行。
// 由于不接受参数,我们提交的函数要么不需要外部数据,只需要处理自身逻辑,
// 否则就必须用某种方式将需要的数据传递进去,例如闭包。
type taskFunc func()
func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {return func() {for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {*sum += num}fmt.Printf("task:%d sum:%d\n", i+1, *sum)wg.Done()}
}// 建立协程池
p, _ := ants.NewPool(10)
defer p.Release()// 生成随机数
nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}// 提交任务,并通过waitGroup来等待所有任务的结束
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()// 
var sum int
for _, partSum := range partSums {sum += partSum
}var expect int
for _, num := range nums {expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)

设计

执行流程图

 

类图如下 

整体设计比较清楚,

  1. Pool是对外提供的协程池对象,通过Options配置生成(NewPool函数)
  2. Pool里面有一个workerArray表示worker池,是一个抽象的接口,其主要就是管理goWorker
  3. goWorker就是实际运行我们任务的载体,通过调用run来执行
  4. ants提供了两种Pool,一个就是Pool,另一个是PoolWithFunc;前者接受一个不接受任何参数的函数作为任务提交给 goroutine 运行。由于不接受参数,我们提交的函数要么不需要外部数据,只需要处理自身逻辑,否则就必须用某种方式将需要的数据传递进去,例如闭包。(我们初始化的时候不需要提供执行函数,在需要执行的时候传入Sumit就可以了);后者在初始化的时候就要提供执行的函数体,然后在后续执行的时候,传入参数给函数体就可以了。这两种方式其实是等价的,使用前者的话,我们利用闭包传递参数就可以了;使用后者的话,我们可以把需要的参数都封装成一个结构体再传入;

优秀的设计

Options的思想

这个思想在go里面还是比较普遍的,比如 GitHub - libp2p/go-libp2p: libp2p implementation in Go 也有这种设计。这种设计的目的其实就是为了可以灵活配置我们的目标对象(在ants就是Pool),我们通过设置一个配置类,通过配置类生成我们的目标对象;那么我们如何配置这个配置类呢?一方面我们可以直接生成配置类,另一种类似堆积木的方式,我们可以传递一个函数组,这个函数组来操作我们要生成的配置类。也就是我们要提供给开发者一个传递函数的手段,如下

// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {return func(opts *Options) {opts.MaxBlockingTasks = maxBlockingTasks}
}

开发者直接调用这个函数传入Pool的初始化函数就可以了。这样的好处就在于把配置的复杂性留给了自己,使用者只需要调用意思明确的WithMaxBlockingTasks就可以了,而且我们如果添加了新的配置的话,对应使用者来说,也只是在需要使用的时候多堆叠一个函数

使用方式如下

func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)time.Sleep(1 * time.Second)wg.Done()}
}func main() {p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))defer p.Release()var wg sync.WaitGroupwg.Add(8)for i := 1; i <= 8; i++ {go func(i int) {err := p.Submit(wrapper(i, &wg))if err != nil {fmt.Printf("task:%d err:%v\n", i, err)wg.Done()}}(i)}wg.Wait()
}

我们可以看到NewPool的实现如下:


// Option represents the optional function.
type Option func(opts *Options)// 根据传入的配置函数来生成最后的配置类
func loadOptions(options ...Option) *Options {opts := new(Options)for _, option := range options {option(opts)}return opts
}// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {opts := loadOptions(options...)// 如果没有传入配置,就使用默认配置if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{capacity: int32(size),lock:     internal.NewSpinLock(),options:  opts,}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if size <= 0 {p.infinite = true}if p.options.PreAlloc {p.workers = newWorkerArray(loopQueueType, size)} else {p.workers = newWorkerArray(stackType, 0)}p.cond = sync.NewCond(p.lock)// Start a goroutine to clean up expired workers periodically.go p.periodicallyPurge()return p, nil
}

锁的设计

type spinLock uint32func (sl *spinLock) Lock() {for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {runtime.Gosched()}
}func (sl *spinLock) Unlock() {atomic.StoreUint32((*uint32)(sl), 0)
}// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {return new(spinLock)
}

我们可以看到作者再使用lock的时候,没有直接使用sync.Mutex,而是直接使用了自己实现的自旋锁,会一直等待直到获取锁,这样做可以减少协程上下文切换的开销,因为其实再协程池里面,每个任务都是等价的,谁前数后其实没多大区别,都是为了一个目的,就是完成分配的任务。

这篇关于go 协程池 ants库分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/779593

相关文章

Android kotlin中 Channel 和 Flow 的区别和选择使用场景分析

《Androidkotlin中Channel和Flow的区别和选择使用场景分析》Kotlin协程中,Flow是冷数据流,按需触发,适合响应式数据处理;Channel是热数据流,持续发送,支持... 目录一、基本概念界定FlowChannel二、核心特性对比数据生产触发条件生产与消费的关系背压处理机制生命周期

go中的时间处理过程

《go中的时间处理过程》:本文主要介绍go中的时间处理过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1 获取当前时间2 获取当前时间戳3 获取当前时间的字符串格式4 相互转化4.1 时间戳转时间字符串 (int64 > string)4.2 时间字符串转时间

Go语言中make和new的区别及说明

《Go语言中make和new的区别及说明》:本文主要介绍Go语言中make和new的区别及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1 概述2 new 函数2.1 功能2.2 语法2.3 初始化案例3 make 函数3.1 功能3.2 语法3.3 初始化

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Go语言中nil判断的注意事项(最新推荐)

《Go语言中nil判断的注意事项(最新推荐)》本文给大家介绍Go语言中nil判断的注意事项,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1.接口变量的特殊行为2.nil的合法类型3.nil值的实用行为4.自定义类型与nil5.反射判断nil6.函数返回的

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,

Java Stream的distinct去重原理分析

《JavaStream的distinct去重原理分析》Javastream中的distinct方法用于去除流中的重复元素,它返回一个包含过滤后唯一元素的新流,该方法会根据元素的hashcode和eq... 目录一、distinct 的基础用法与核心特性二、distinct 的底层实现原理1. 顺序流中的去重

Go语言代码格式化的技巧分享

《Go语言代码格式化的技巧分享》在Go语言的开发过程中,代码格式化是一个看似细微却至关重要的环节,良好的代码格式化不仅能提升代码的可读性,还能促进团队协作,减少因代码风格差异引发的问题,Go在代码格式... 目录一、Go 语言代码格式化的重要性二、Go 语言代码格式化工具:gofmt 与 go fmt(一)