本文主要是介绍go 协程池 ants库分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
简介
相比于创建多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。但是受限于资源总量,系统中能够创建的 goroutine 数量也是受限的。默认每个 goroutine 占用 8KB 内存,一台 8GB 内存的机器满打满算也只能创建 8GB/8KB = 1000000 个 goroutine,更何况系统还需要保留一部分内存运行日常管理任务,go 运行时需要内存运行 gc、处理 goroutine 切换等。使用的内存超过机器内存容量,系统会使用交换区(swap),导致性能急速下降,甚至直接error
另一方面,goroutine 的管理也是一个问题。goroutine 只能自己运行结束,外部没有任何手段可以强制j结束一个 goroutine。如果一个 goroutine 因为某种原因没有自行结束,就会出现 goroutine 泄露。此外,频繁创建 goroutine 也是一个开销。
所以,我们就会需要一个goroutine池,自动管理goroutine生命周期,可以按需创建,动态缩容,自动安排任务的执行
以上内容摘自Go 每日一库之 ants
使用
使用上可以参考 Go 每日一库之 ants 里面给的例子,是一个计算大量整数和的程序。如下
// 包装任务需要执行的内容
// ants支持将一个不接受任何参数的函数作为任务提交给 goroutine 运行。
// 由于不接受参数,我们提交的函数要么不需要外部数据,只需要处理自身逻辑,
// 否则就必须用某种方式将需要的数据传递进去,例如闭包。
type taskFunc func()
func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {return func() {for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {*sum += num}fmt.Printf("task:%d sum:%d\n", i+1, *sum)wg.Done()}
}// 建立协程池
p, _ := ants.NewPool(10)
defer p.Release()// 生成随机数
nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}// 提交任务,并通过waitGroup来等待所有任务的结束
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()//
var sum int
for _, partSum := range partSums {sum += partSum
}var expect int
for _, num := range nums {expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)
设计
执行流程图
类图如下
整体设计比较清楚,
- Pool是对外提供的协程池对象,通过Options配置生成(NewPool函数)
- Pool里面有一个workerArray表示worker池,是一个抽象的接口,其主要就是管理goWorker
- goWorker就是实际运行我们任务的载体,通过调用run来执行
- ants提供了两种Pool,一个就是Pool,另一个是PoolWithFunc;前者接受一个不接受任何参数的函数作为任务提交给 goroutine 运行。由于不接受参数,我们提交的函数要么不需要外部数据,只需要处理自身逻辑,否则就必须用某种方式将需要的数据传递进去,例如闭包。(我们初始化的时候不需要提供执行函数,在需要执行的时候传入Sumit就可以了);后者在初始化的时候就要提供执行的函数体,然后在后续执行的时候,传入参数给函数体就可以了。这两种方式其实是等价的,使用前者的话,我们利用闭包传递参数就可以了;使用后者的话,我们可以把需要的参数都封装成一个结构体再传入;
优秀的设计
Options的思想
这个思想在go里面还是比较普遍的,比如 GitHub - libp2p/go-libp2p: libp2p implementation in Go 也有这种设计。这种设计的目的其实就是为了可以灵活配置我们的目标对象(在ants就是Pool),我们通过设置一个配置类,通过配置类生成我们的目标对象;那么我们如何配置这个配置类呢?一方面我们可以直接生成配置类,另一种类似堆积木的方式,我们可以传递一个函数组,这个函数组来操作我们要生成的配置类。也就是我们要提供给开发者一个传递函数的手段,如下
// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {return func(opts *Options) {opts.MaxBlockingTasks = maxBlockingTasks}
}
开发者直接调用这个函数传入Pool的初始化函数就可以了。这样的好处就在于把配置的复杂性留给了自己,使用者只需要调用意思明确的WithMaxBlockingTasks就可以了,而且我们如果添加了新的配置的话,对应使用者来说,也只是在需要使用的时候多堆叠一个函数
使用方式如下
func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)time.Sleep(1 * time.Second)wg.Done()}
}func main() {p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))defer p.Release()var wg sync.WaitGroupwg.Add(8)for i := 1; i <= 8; i++ {go func(i int) {err := p.Submit(wrapper(i, &wg))if err != nil {fmt.Printf("task:%d err:%v\n", i, err)wg.Done()}}(i)}wg.Wait()
}
我们可以看到NewPool的实现如下:
// Option represents the optional function.
type Option func(opts *Options)// 根据传入的配置函数来生成最后的配置类
func loadOptions(options ...Option) *Options {opts := new(Options)for _, option := range options {option(opts)}return opts
}// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {opts := loadOptions(options...)// 如果没有传入配置,就使用默认配置if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{capacity: int32(size),lock: internal.NewSpinLock(),options: opts,}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if size <= 0 {p.infinite = true}if p.options.PreAlloc {p.workers = newWorkerArray(loopQueueType, size)} else {p.workers = newWorkerArray(stackType, 0)}p.cond = sync.NewCond(p.lock)// Start a goroutine to clean up expired workers periodically.go p.periodicallyPurge()return p, nil
}
锁的设计
type spinLock uint32func (sl *spinLock) Lock() {for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {runtime.Gosched()}
}func (sl *spinLock) Unlock() {atomic.StoreUint32((*uint32)(sl), 0)
}// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {return new(spinLock)
}
我们可以看到作者再使用lock的时候,没有直接使用sync.Mutex,而是直接使用了自己实现的自旋锁,会一直等待直到获取锁,这样做可以减少协程上下文切换的开销,因为其实再协程池里面,每个任务都是等价的,谁前数后其实没多大区别,都是为了一个目的,就是完成分配的任务。
这篇关于go 协程池 ants库分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!