Golang sync.WaitGroup源码详细分析

2024-01-05 17:08

本文主要是介绍Golang sync.WaitGroup源码详细分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、介绍

WaitGroup是多个goroutine之间协作的一种实现方式,主要功能就是阻塞等待一组goroutine执行完成。

 

常用的使用场景:主goroutine调用Add函数设置需要等待的goroutine的数量,当每个goroutine执行完成后调用Done函数(将counter减1),Wait函数用于阻塞等待直到该组中的所有goroutine都执行完成。

 

源码中主要设计到的三个概念:counter、waiter和semaphore

counter: 当前还未执行结束的goroutine计数器

waiter : 等待goroutine-group结束的goroutine数量,即有多少个等候者

semaphore: 信号量

信号量是Unix系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。

可简单理解为信号量为一个数值:

当信号量>0时,表示资源可用,获取信号量时系统自动将信号量减1;

当信号量=0时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒。

 

二、源码分析

Golang源码版本 :1.10.3

1.结构体

  type WaitGroup struct {noCopy noCopy  //该WaitGroup对象不允许拷贝使用,只能用指针传递// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers do not ensure it. So we allocate 12 bytes and then use// the aligned 8 bytes in them as state.//用于存储计数器(counter)和waiter的值// 只需要64位,即8个字节,其中高32位是counter值,低32位值是waiter值// 不直接使用uint64,是因为uint64的原子操作需要64位系统,而32位系统下,可能会出现崩溃// 所以这里用byte数组来实现,32位系统下4字节对齐,64位系统下8字节对齐,所以申请12个字节,其中必定有8个字节是符合8字节对齐的,下面的state()函数中有进行判断state1 [12]bytesema   uint32  //信号量
}

    从结构体中我们看到

     state1是一个12位长度的byte数组,用于存储counter和waiter的值

     sema就是传说中的信号量

2.state函数

state是一个内部函数,用于获取counter和 waiter的值

//获取counter  、 waiter的值  (counter是uint64的高32位,waiter是uint64的低32位)
func (wg *WaitGroup) state() *uint64 {// 根据state1的起始地址分析,若是8字节对齐的,则直接用前8个字节作为*uint64类型// 若不是,说明是4字节对齐,则后移4个字节后,这样必为8字节对齐,然后取后面8个字节作为*uint64类型if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {return (*uint64)(unsafe.Pointer(&wg.state1))} else {return (*uint64)(unsafe.Pointer(&wg.state1[4]))}
}

3.Add方法

//用于增加或减少计数器(counter)的值
//如果计数器为0,则释放调用Wait方法时的阻塞,如果计数器为负,则panic
//Add()方法应该在Wait()方法调用之前
func (wg *WaitGroup) Add(delta int) {//获取当前counter和 waiter的值statep := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyif delta < 0 {// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}//将delta的值添加到counter上state := atomic.AddUint64(statep, uint64(delta)<<32)v := int32(state >> 32)  //counter值w := uint32(state)  //waiter值if race.Enabled && delta > 0 && v == int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(&wg.sema))}//counter为负数,则触发panicif v < 0 {panic("sync: negative WaitGroup counter")}// waiter值不为0,累加后的counter值和delta相等,说明Wait()方法没有在Add()方法之后调用,触发panic,因为正确的做法是先Add()后Wait()if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}//Add()添加正常返回//1.counter > 0,说明还不需要释放信号量,可以直接返回//2. waiter  = 0 ,说明没有等待的goroutine,也不需要释放信号量,可以直接返回if v > 0 || w == 0 {return}// This goroutine has set counter to 0 when waiters > 0.// Now there can't be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter == 0.// Still do a cheap sanity check to detect WaitGroup misuse.//下面是 counter == 0 并且 waiter > 0的情况//现在若原state和新的state不等,则有以下两种可能//1. Add 和 Wait方法同时调用//2. counter已经为0,但waiter值有增加,这种情况永远不会触发信号量了// 以上两种情况都是错误的,所以触发异常//注:state := atomic.AddUint64(statep, uint64(delta)<<32)  这一步调用之后,state和*statep的值应该是相等的,除非有以上两种情况发生if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.//将waiter 和 counter都置为0*statep = 0//原子递减信号量,并通知等待的goroutinefor ; w != 0; w-- {runtime_Semrelease(&wg.sema, false)}
}

  4.Done方法

// Done decrements the WaitGroup counter by one.
//将计数器(counter)的值减1
func (wg *WaitGroup) Done() {wg.Add(-1)
}

5.Wait方法

// Wait blocks until the WaitGroup counter is zero.
//调用Wait方法会阻塞当前调用的goroutine直到 counter的值为0
//也会增加waiter的值
func (wg *WaitGroup) Wait() {//获取当前counter和 waiter的值statep := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyrace.Disable()}//一直等待,直到无需等待或信号量触发,才返回for {state := atomic.LoadUint64(statep)v := int32(state >> 32)  //counter值w := uint32(state)  //waiter值//如果counter值为0,则说明所有goroutine都退出了,无需等待,直接退出if v == 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.//原子增加waiter的值,CAS方法,外面for循环会一直尝试,保证多个goroutine同时调用Wait()也能正常累加waiterif atomic.CompareAndSwapUint64(statep, state, state+1) {if race.Enabled && w == 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(&wg.sema))}//一直等待信号量sema,直到信号量触发,runtime_Semacquire(&wg.sema)//从上面的Add()方法看到,触发信号量之前会将seatep置为0(即counter和waiter都置为0),所以此时应该也为0//如果不为0,说明WaitGroup此时又执行了Add()或者Wait()操作,所以会触发panicif *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}}
}

 

三、注意点

1.Add()必须在Wait()前调用

2.Add()设置的值必须与实际等待的goroutine个数一致,如果设置的值大于实际的goroutine数量,可能会一直阻塞。如果小于会触发panic

3. WaitGroup不可拷贝,可以通过指针传递,否则很容易造成BUG

 

以下为值拷贝引起的Bug示例

demo1:因为值拷贝引起的死锁

func main() {var wg sync.WaitGroupwg.Add(5)for i := 0 ; i < 5 ; i++ {test(wg)}wg.Wait()
}func test(wg sync.WaitGroup) {go func() {fmt.Println("hello")wg.Done()}()
}

 

demo2:因为值拷贝引起的不会阻塞等待现象

func main() {var wg sync.WaitGroupfor i := 0 ; i < 5 ; i++ {test(wg)}wg.Wait()
}func test(wg sync.WaitGroup) {go func() {wg.Add(1)fmt.Println("hello")time.Sleep(time.Second*5)wg.Done()}()
}

 

demo3:因为值拷贝引发的panic

type person struct {wg sync.WaitGroup
}func (t *person) say()  {go func() {fmt.Println("say Hello!")time.Sleep(time.Second*5)t.wg.Done()}()
}func main() {var wg sync.WaitGroupt := person{wg:wg}wg.Add(5)for  i := 0 ; i< 5 ;i++ {t.say()}wg.Wait()
}

 

感谢:https://blog.csdn.net/yzf279533105/article/details/97302666

这篇关于Golang sync.WaitGroup源码详细分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/573568

相关文章

golang内存对齐的项目实践

《golang内存对齐的项目实践》本文主要介绍了golang内存对齐的项目实践,内存对齐不仅有助于提高内存访问效率,还确保了与硬件接口的兼容性,是Go语言编程中不可忽视的重要优化手段,下面就来介绍一下... 目录一、结构体中的字段顺序与内存对齐二、内存对齐的原理与规则三、调整结构体字段顺序优化内存对齐四、内

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

Golang的CSP模型简介(最新推荐)

《Golang的CSP模型简介(最新推荐)》Golang采用了CSP(CommunicatingSequentialProcesses,通信顺序进程)并发模型,通过goroutine和channe... 目录前言一、介绍1. 什么是 CSP 模型2. Goroutine3. Channel4. Channe

Golang使用minio替代文件系统的实战教程

《Golang使用minio替代文件系统的实战教程》本文讨论项目开发中直接文件系统的限制或不足,接着介绍Minio对象存储的优势,同时给出Golang的实际示例代码,包括初始化客户端、读取minio对... 目录文件系统 vs Minio文件系统不足:对象存储:miniogolang连接Minio配置Min

Golang使用etcd构建分布式锁的示例分享

《Golang使用etcd构建分布式锁的示例分享》在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统,分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要,它有助于维护一致性,防止竞... 目录引言环境准备新建Go项目实现加锁和解锁功能测试分布式锁重构实现失败重试总结引言我们将使用Go作

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get