本文主要是介绍Go 学习笔记(66)— Go 并发同步原语(sync.Mutex、sync.RWMutex、sync.Once)及并发模型(ping-pong 模式、fan-in 模式、fan-out 模式),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1. 竞态条件
一旦数据被多个线程共享,那么就很可能会产生争用和冲突的情况。这种情况也被称为竞态条件(race condition
),这往往会破坏共享数据的一致性。
举个例子,同时有多个线程连续向同一个缓冲区写入数据块,如果没有一个机制去协调这些线程的写入操作的话,那么被写入的数据块就很可能会出现错乱。
比如,在线程 A 还没有写完一个数据块的时候,线程 B 就开始写入另外一个数据块了。
显然,这两个数据块中的数据会被混在一起,并且已经很难分清了。因此,在这种情况下,我们就需要采取一些措施来协调它们对缓冲区的修改。这通常就会涉及同步。
2. 同步作用
概括来讲,同步的用途有两个,
- 一个是避免多个线程在同一时刻操作同一个数据块,
- 另一个是协调多个线程,以避免它们在同一时刻执行同一个代码块。
我们所说的同步其实就是在控制多个线程对共享资源的访问。
3. 临界区
一个线程在想要访问某一个共享资源的时候,需要先申请对该资源的访问权限,并且只有在申请成功之后,访问才能真正开始。而当线程对共享资源的访问结束时,它还必须归还对该资源的访问权限,若要再次访问仍需申请。
如果针对某个共享资源的访问令牌只有一块,那么在同一时刻,就最多只能有一个线程进入到那个区域,并访问到该资源。这时,我们可以说,多个并发运行的线程对这个共享资源的访问是完全串行的。
只要一个代码片段需要实现对共享资源的串行化访问,就可以被视为一个临界区(critical section
),也就是我刚刚说的,由于要访问到资源而必须进入的那个区域。
比如,在我前面举的那个例子中,实现了数据块写入操作的代码就共同组成了一个临界区。如果针对同一个共享资源,这样的代码片段有多个,那么它们就可以被称为相关临界区。它们可以是一个内含了共享数据的结构体及其方法,也可以是操作同一块共享数据的多个函数。
临界区总是需要受到保护的,否则就会产生竞态条件。施加保护的重要手段之一,就是使用实现了某种同步机制的工具,也称为同步工具。
4. Go 中的同步工具
4.1 sync.Mutex
在 Go
语言中,可供我们选择的同步工具并不少。其中,最重要且最常用的同步工具当属互斥量(mutual exclusion
,简称 mutex
)。sync
包中的 Mutex
就是与其对应的类型,该类型的值可以被称为互斥量或者互斥锁。
一个互斥锁可以被用来保护一个临界区或者一组相关临界区。我们可以通过它来保证,在同一时刻只有一个 goroutine
处于该临界区之内。
为了兑现这个保证,每当有 goroutine
想进入临界区时,都需要先对它进行锁定,并且,每个 goroutine
离开临界区时,都要及时地对它进行解锁。
锁定操作可以通过调用互斥锁的 Lock
方法实现,而解锁操作可以调用互斥锁的 Unlock
方法
使用互斥锁的注意事项如下:
- 不要重复锁定互斥锁;
- 不要忘记解锁互斥锁,必要时使用defer语句;
- 不要对尚未锁定或者已解锁的互斥锁解锁;
- 不要在多个函数之间直接传递互斥锁。
一旦,你把一个互斥锁同时用在了多个地方,就必然会有更多的 goroutine
争用这把锁。这不但会让你的程序变慢,还会大大增加死锁(deadlock
)的可能性。
所谓的死锁,指的就是当前程序中的主 goroutine
,以及我们启用的那些 goroutine
都已经被阻塞。这些 goroutine
可以被统称为用户级的 goroutine
。这就相当于整个程序都已经停滞不前了。
死锁程序是所有并发进程彼此等待的程序。在这种情况下,如果没有外界的干预,这个程序将永远无陆恢复。
Go
语言运行时系统是不允许这种情况出现的,只要它发现所有的用户级 goroutine
都处于等待状态,就会自行抛出一个带有如下信息的 panic
:
fatal error: all goroutines are asleep - deadlock!
注意,这种由 Go
语言运行时系统自行抛出的 panic
都属于致命错误,都是无法被恢复的,调用 recover
函数对它们起不到任何作用。也就是说,一旦产生死锁,程序必然崩溃。
忘记解锁导致的问题有时候是比较隐秘的,并不会那么快就暴露出来。这也是我们需要特别关注它的原因。相比之下,解锁未锁定的互斥锁会立即引发 panic
。并且,与死锁导致的 panic
一样,它们是无法被恢复的。
因此,我们总是应该保证,对于每一个锁定操作,都要有且只有一个对应的解锁操作。
Go
语言中的互斥锁是开箱即用的。换句话说,一旦我们声明了一个 sync.Mutex
类型的变量,就可以直接使用它了。不过要注意,该类型是一个结构体类型,属于值类型中的一种。把它传给一个函数、将它从函数中返回、把它赋给其他变量、让它进入某个通道都会导致它的副本的产生。并且,原值和它的副本,以及多个副本之间都是完全独立的,它们都是不同的互斥锁。
如果你把一个互斥锁作为参数值传给了一个函数,那么在这个函数中对传入的锁的所有操作,都不会对存在于该函数之外的那个原锁产生任何的影响。
package mainimport ("fmt""time"
)var sum int = 0func main() {for i := 0; i < 1000; i++ {go add(10)}time.Sleep(2 * time.Second)fmt.Println("sum is ", sum)}func add(i int) {sum += i
}
这段代码循环 1000 次,每次给 sum 加 10,理论上应该是 10000,但是执行结果为 8380、或者 9010 或者 9130 等。
原因是多个 go 语句并发地对 sum 进行加 10 操作,不能保证每次取的值就是上一次执行的结果。
使用互斥锁的代码示例:
package mainimport ("fmt""sync""time"
)var (sum int = 0mutex sync.Mutex
)func main() {for i := 0; i < 1000; i++ {go add(10)}time.Sleep(2 * time.Second)fmt.Println("sum is ", sum)}func add(i int) {mutex.Lock()sum += imutex.Unlock()
}
Mutex
的 Lock
和 Unlock
方法总是成对出现,而且要确保 Lock
获得锁后,一定执行 UnLock
释放锁,所以在函数或者方法中会采用 defer
语句释放锁,如下面的代码所示:
func add(i int) {mutex.Lock()defer mutex.Unlock()sum += i
}
4.2 sync.RWMutex
读写锁是读 / 写互斥锁的简称。在 Go
语言中,读写锁由 sync.RWMutex
类型的值代表。与 sync.Mutex
类型一样,这个类型也是开箱即用的。
顾名思义,读写锁是把对共享资源的“读操作”和“写操作”区别对待了。它可以对这两种操作施加不同程度的保护。换句话说,相比于互斥锁,读写锁可以实现更加细腻的访问控制。
一个读写锁中实际上包含了两个锁,即:读锁和写锁。sync.RWMutex
类型中的 Lock
方法和 Unlock
方法分别用于对写锁进行锁定和解锁,而它的RLock
方法和 RUnlock
方法则分别用于对读锁进行锁定和解锁。
另外,对于同一个读写锁来说有如下规则。
- 在写锁已被锁定的情况下再试图锁定写锁,会阻塞当前的
goroutine
。 - 在写锁已被锁定的情况下试图锁定读锁,会阻塞当前的
goroutine
。 - 在读锁已被锁定的情况下试图锁定写锁,会阻塞当前的
goroutine
。 - 在读锁已被锁定的情况下再试图锁定读锁,并不会阻塞当前的
goroutine
。
换一个角度来说,对于某个受到读写锁保护的共享资源,
- 多个写操作不能同时进行
- 写操作和读操作也不能同时进行
- 多个读操作却可以同时进行
再来看另一个方面。对写锁进行解锁,会唤醒“所有因试图锁定读锁,而被阻塞的 goroutine”,并且,这通常会使它们都成功完成对读锁的锁定。
然而,对读锁进行解锁,只会在没有其他读锁锁定的前提下,唤醒“因试图锁定写锁,而被阻塞的 goroutine”;并且,最终只会有一个被唤醒的 goroutine 能够成功完成对写锁的锁定,其他的 goroutine 还要在原处继续等待。至于是哪一个 goroutine,那就要看谁的等待时间最长了。
除此之外,读写锁对写操作之间的互斥,其实是通过它内含的一个互斥锁实现的。因此,也可以说,Go 语言的读写锁是互斥锁的一种扩展。
最后,需要强调的是,与互斥锁类似,解锁“读写锁中未被锁定的写锁”,会立即引发 panic
,对于其中的读锁也是如此,并且同样是不可恢复的。
不过,正因为如此,我们可以使用它对共享资源的操作,实行更加细腻的控制。另外,由于这里的读写锁是互斥锁的一种扩展,所以在有些方面它还是沿用了互斥锁的行为模式。比如,在解锁未锁定的写锁或读锁时的表现,又比如,对写操作之间互斥的实现方式。
最后,需要特别注意的是,无论是互斥锁还是读写锁,我们都不要试图去解锁未锁定的锁,因为这样会引发不可恢复的 panic
。
package mainimport ("fmt""sync""time"
)var (sum int = 0mutex sync.Mutex
)func main() {for i := 0; i < 1000; i++ {go add(10)}for i := 0; i < 10; i++ {go readSum()}time.Sleep(2 * time.Second)}func readSum() {ret := sumfmt.Println("ret is ", ret)
}func add(i int) {mutex.Lock()sum += imutex.Unlock()
}
这个示例开启了 10 个协程,它们同时读取 sum
的值。因为 readSum
函数并没有任何加锁控制,所以它不是并发安全的,即一个 goroutine
正在执行 sum += i
操作的时候,另一个 goroutine
可能正在执行 ret := sum
操作,这就会导致读取的 num
值是一个过期的值,结果不可预期。
如果要解决以上资源竞争的问题,可以使用互斥锁 sync.Mutex
,如下面的代码所示:
func readSum() {mutex.Lock()ret := summutex.Unlock()fmt.Println("ret is ", ret)
}
因为 add
和 readSum
函数使用的是同一个 sync.Mutex
,所以它们的操作是互斥的,也就是一个 goroutine
进行修改操作 sum+=i
的时候,另一个 gouroutine
读取 sum
的操作 ret := sum
会等待,直到修改操作执行完毕。
现在我们解决了多个 goroutine 同时读写的资源竞争问题,但是又遇到另外一个问题——性能。因为每次读写共享资源都要加锁,所以性能低下,这该怎么解决呢?
现在我们分析读写这个特殊场景,有以下几种情况:
-
写的时候不能同时读,因为这个时候读取的话可能读到脏数据(不正确的数据);
-
读的时候不能同时写,因为也可能产生不可预料的结果;
-
读的时候可以同时读,因为数据不会改变,所以不管多少个
goroutine
读都是并发安全的。
所以就可以通过读写锁 sync.RWMutex
来优化这段代码,提升性能。现在我将以上示例改为读写锁,来实现我们想要的结果,如下所示:
var rwmutex sync.RWMutexfunc readSum() {rwmutex.RLock()ret := sumrwmutex.RUnlock()fmt.Println("ret is ", ret)
}
对比互斥锁的示例,读写锁的改动有两处:
-
把锁的声明换成读写锁
sync.RWMutex
。 -
把函数 readSum 读取数据的代码换成读锁,也就是
RLock
和RUnlock
。
这样性能就会有很大的提升,因为多个 goroutine
可以同时读数据,不再相互等待。
4.3 sync.Once
在实际的工作中,你可能会有这样的需求:让代码只执行一次,哪怕是在高并发的情况下,比如创建一个单例。
sync.Once 则可以保证某一个操作只能执行一次,它在实践中的使用也非常广泛。例如,我们希望配置的加载、日志的初始化只在初始化时加载一次。又如在释放资源时,我们希望文件描述符与通道只关闭一次,这时候都可以用到 sync.Once。在下面这个例子中,使用 sync.Once 只允许 MysSQL 数据库打开一次。
var (once sync.Once
)func DbOnce() (*sql.DB, error) {once.Do(func() {fmt.Println("Am called")db, dbErr = sql.Open("mysql", "root:test@tcp(127.0.0.1:3306)/test")if dbErr != nil {return}dbErr = db.Ping()})return db, dbErr
}
针对这种情形,Go
语言为我们提供了 sync.Once
来保证代码只执行一次,如下所示:
package mainimport ("fmt""sync"
)func main() {var once sync.Oncedone := make(chan bool)for i := 0; i < 10; i++ {go func() {//把要执行的函数(方法)作为参数传给once.Do方法即可once.Do(greet)done <- true}()}for i := 0; i < 10; i++ {<-done}
}func greet() {fmt.Println("hello world")
}
输出结果:
hello world
这是 Go
语言自带的一个示例,虽然启动了 10 个协程来执行 greet
函数,但是因为用了 once.Do
方法,所以函数 greet
只会被执行一次。也就是说在高并发的情况下,sync.Once
也会保证 greet
函数只执行一次。
sync.Once
适用于创建某个对象的单例、只加载一次的资源等只执行一次的场景。
如果没有调用 once.Do
的方法,如下则会执行 10 次 greet
函数。
package mainimport ("fmt"
)func main() {// var once sync.Oncedone := make(chan bool)for i := 0; i < 10; i++ {go func() {//把要执行的函数(方法)作为参数传给once.Do方法即可// once.Do(greet)greet()done <- true}()}for i := 0; i < 10; i++ {<-done}
}func greet() {fmt.Println("hello world")
}
问题:sync.Once类型值的Do方法是怎么保证只执行参数函数一次的?
与 sync.WaitGroup
类型一样,sync.Once
类型(以下简称 Once
类型)也属于结构体类型,同样也是开箱即用和并发安全的。
由于这个类型中包含了一个 sync.Mutex
类型的字段,所以,复制该类型的值也会导致功能的失效。
Once
类型的 Do
方法只接受一个参数,这个参数的类型必须是 func()
,即:无参数声明和结果声明的函数。该方法的功能并不是对每一种参数函数都只执行一次,而是只执行“首次被调用时传入的”那个函数,并且之后不会再执行任何参数函数。
如下代码:
func main() {var count intincrement := func() { count++ }decrement := func() {fmt.Println("enter decrement")count--}var once sync.Onceonce.Do(increment)once.Do(decrement)fmt.Printf("count is %v", count)
}
打印结果为:
count is 1
可以看到 count
并不是 0,同时呢我们发现 decrement
函数中并没有进去,所以就验证了上面的说法。once
只执行一次 Do
方法。
所以,如果你有多个只需要执行一次的函数,那么就应该为它们中的每一个都分配一个 sync.Once
类型的值(以下简称 Once
值)。
func main() {var count intincrement := func() { count++ }decrement := func() {fmt.Println("enter decrement")count--}var onceIn sync.Once // 给 increment 函数声明一个 Once 类型值var onceDe sync.Once // 给 decrement 函数声明一个 Once 类型值onceIn.Do(increment)onceDe.Do(decrement)fmt.Printf("count is %v", count)
}
输出结果为:
enter decrement
count is 0
Once
类型中还有一个名叫 done
的 uint32
类型的字段。它的作用是记录其所属值的 Do
方法被调用的次数。不过,该字段的值只可能是 0 或者 1。一旦 Do
方法的首次调用完成,它的值就会从 0 变为 1。
你可能会问,既然 done
字段的值不是 0 就是 1,那为什么还要使用需要四个字节的 uint32
类型呢?原因很简单,因为对它的操作必须是“原子”的。Do
方法在一开始就会通过调用 atomic.LoadUint32
函数来获取该字段的值,并且一旦发现该值为 1,就会直接返回。这也初步保证了“ Do
方法,只会执行首次被调用时传入的函数”。
不过,单凭这样一个判断的保证是不够的。因为,如果有两个 goroutine
都调用了同一个新的 Once
值的 Do
方法,并且几乎同时执行到了其中的这个条件判断代码,那么它们就都会因判断结果为 false
,而继续执行 Do
方法中剩余的代码。在这个条件判断之后,Do
方法会立即锁定其所属值中的那个 sync.Mutex
类型的字段 m
。然后,它会在临界区中再次检查 done
字段的值,并且仅在条件满足时,才会去调用参数函数,以及用原子操作把 done
的值变为 1。
下面我再来说说这个 Do
方法在功能方面的两个特点。
第一个特点,由于 Do
方法只会在参数函数执行结束之后把 done
字段的值变为 1,因此,如果参数函数的执行需要很长时间或者根本就不会结束(比如执行一些守护任务),那么就有可能会导致相关 goroutine
的同时阻塞。例如,有多个 goroutine
并发地调用了同一个Once
值的 Do
方法,并且传入的函数都会一直执行而不结束。那么,这些 goroutine
就都会因调用了这个 Do
方法而阻塞。因为,除了那个抢先执行了参数函数的 goroutine
之外,其他的 goroutine
都会被阻塞在锁定该 Once
值的互斥锁 m
的那行代码上。
第二个特点,Do
方法在参数函数执行结束后,对 done
字段的赋值用的是原子操作,并且,这一操作是被挂在 defer
语句中的。因此,不论参数函数的执行会以怎样的方式结束,done
字段的值都会变为 1。也就是说,即使这个参数函数没有执行成功(比如引发了一个 panic),我们也无法使用同一个 Once
值重新执行它了。所以,如果你需要为参数函数的执行设定重试机制,那么就要考虑 Once
值的适时替换问题。在很多时候,我们需要依据 Do
方法的这两个特点来设计与之相关的流程,以避免不必要的程序阻塞和功能缺失。
Once
值的使用方式比 WaitGroup
值更加简单,它只有一个 Do
方法。同一个 Once
值的 Do
方法,永远只会执行第一次被调用时传入的参数函数,不论这个函数的执行会以怎样的方式结束。只要传入某个 Do
方法的参数函数没有结束执行,任何之后调用该方法的 goroutine
就都会被阻塞。只有在这个参数函数执行结束以后,那些 goroutine
才会逐一被唤醒。Once
类型使用互斥锁和原子操作实现了功能,而 WaitGroup
类型中只用到了原子操作。
5. 并发模型
之前我们讲了很多传统的同步模式,但是我们在实践中去协调协程时,使用得最多的还是通道。
通道的厉害之处在于,在通信的过程中完成了数据所有权的转移。数据只可能在某一个协程中执行,这就在无形中消除了并发访问数据的问题,数据争用问题仍然存在,例如通道内部仍然需要使用锁,但 Go 语言已经为我们屏蔽了底层锁实现的细节。借助通道,我们可以创造出许多有表现力的高并发模型。
5.1 ping-pong 模式
ping-pong 模式即乒乓球模式,它比较形象地呈现了数据之间一来一回的关系。收到数据的协程可以在不加锁的情况下对数据进行处理,而不必担心有并发冲突。
实例代码如下所示。两个协程 player 就相当于两个球员,而通道 table 则类似于球桌
func main(){var Ball inttable:= make(chan int)go player(table)go player(table)table<-Balltime.Sleep(1*time.Second)<-table}
func player(table chan int) {for{ball:=<-tableball++time.Sleep(100*time.Millisecond)table<-ball}
}
如果我们把两个 player 扩展为多个 player,是不是就有点像很多人在踢毽子了。当我们遇到类型的问题,可以用这一简单的模式来进行抽象。
5.2 fan-in 模式
fan-in 模式又叫扇入模式,意思是多个协程把数据写入到通道中,但只有一个协程等待读取通道数据。
这种模式在实践中有很多应用场景。举个例子,我们想查找某一个文件夹中有没有特殊的关键字。当文件数量很多时,我们可以用并发的方式去查找,找到结果后输出到相同的通道中打印出来。
func search(ch chan string, msg string) {var i intfor {// 模拟找到了关键字ch <- fmt.Sprintf("get %s %d", msg, i)i++time.Sleep(1000 * time.Millisecond)}
}func main() {ch := make(chan string)go search(ch, "jonson")go search(ch, "olaya")for i := range ch {fmt.Println(i)}
}
不过,fan-in 模式在读取数据时,并不总是只有一个通道。它也可以同时读取多个通道的内容,以多路复用的形式存在。让我们把上面的例子改造一下,现在 search 函数会返回一个新的通道,并新建协程把数据写入到这个通道中。在读取数据时,我们要监听 ch1、ch2 两个协程,并使用 select 来实现多路复用。
func search(msg string) chan string {var ch = make(chan string)go func() {var i intfor {ch <- fmt.Sprintf("get %s %d", msg, i)i++time.Sleep(100 * time.Millisecond)}}()return ch
}func main() {ch1 := search("jonson")ch2 := search("olaya")for {select {case msg := <-ch1:fmt.Println(msg)case msg := <-ch2:fmt.Println(msg)}}
}
fan-in 模式比较清晰,在实际中也是很常见的。例如我们之后在项目中会看到,通过 fan-in 模式来整合爬取到的数据,并存储起来。
5.3 fan-out 模式
fan-out 模式与 fan-in 模式相反,它描述的是一个协程完成数据的写入,但是多个协程抢夺同一个通道中的数据的场景。
Fan-out 模式通常会用在任务的分配中。比方说,程序消费 Kafka、NATS 等中间件的数据,多个协程就会监听同一个通道中的数据,读到数据后立即进行后续的处理,处理完毕后再继续读取,循环往复。
以下面的代码为例。多个 Worker 监听同一个协程,而 tasksCh <- i 会把任务分配到 Worker 中去。fan-out 模式使 Worker 得到了充分的利用,并且任务的分配也实现了负载均衡,哪一个 Worker 闲下来了就会自动去领取新的任务(注意,示例代码中的 sync.WaitGroup 只是为了防止 main 函数提前退出):
func worker(tasksCh <-chan int, wg *sync.WaitGroup) {defer wg.Done()for {task, ok := <-tasksChif !ok {return}d := time.Duration(task) * time.Millisecondtime.Sleep(d)fmt.Println("processing task", task)}
}func pool(wg *sync.WaitGroup, workers, tasks int) {tasksCh := make(chan int)for i := 0; i < workers; i++ {go worker(tasksCh, wg)}for i := 0; i < tasks; i++ {tasksCh <- i}close(tasksCh)
}func main() {var wg sync.WaitGroupwg.Add(36)go pool(&wg, 36, 50)wg.Wait()
}
在生产实践中,我们还可以在上面这个例子的基础上构建出更复杂的模型,例如每一个 Worker 中还可以分出多个 Subwoker。
我们就尝试在前例的基础上构建出具有 Subworker 的并发模式。
如下所示,Worker 也变成了类似调度的模式,Worker 创建出了多个 Subworker 的工作线程,并通过 subtasks <- task1 将任务分发到了 Subworker 中。
const (WORKERS = 5SUBWORKERS = 3TASKS = 20SUBTASKS = 10
)func subworker(subtasks chan int) {for {task, ok := <-subtasksif !ok {return}time.Sleep(time.Duration(task) * time.Millisecond)fmt.Println(task)}
}func worker(tasks <-chan int, wg *sync.WaitGroup) {defer wg.Done()for {task, ok := <-tasksif !ok {return}subtasks := make(chan int)for i := 0; i < SUBWORKERS; i++ {go subworker(subtasks)}for i := 0; i < SUBTASKS; i++ {task1 := task * isubtasks <- task1}close(subtasks)}
}func main() {var wg sync.WaitGroupwg.Add(WORKERS)tasks := make(chan int)for i := 0; i < WORKERS; i++ {go worker(tasks, &wg)}for i := 0; i < TASKS; i++ {tasks <- i}close(tasks)wg.Wait()
}
这篇关于Go 学习笔记(66)— Go 并发同步原语(sync.Mutex、sync.RWMutex、sync.Once)及并发模型(ping-pong 模式、fan-in 模式、fan-out 模式)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!