揭秘 Go 并发利器 WaitGroup
在 Go 语言的并发编程中,WaitGroup 是一个至关重要的工具,它为开发者提供了一种简单且有效的方式来管理和同步多个协程的执行。本文将深入分析 WaitGroup 的实现原理、使用注意事项,并提供实用的使用示例。
什么是 WaitGroup
WaitGroup
是 Go 标准库 sync
包中的同步原语,用于等待一组并发操作完成。它的主要作用是确保主协程(调用 WaitGroup 相关方法的协程)可以等待其他协程完成任务后再继续执行。
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
for i := 0; i < 5; i++ {
fmt.Printf("Worker %d working... step %d\n", id, i)
}
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
wg.Add(3)
go worker(1, &wg)
go worker(2, &wg)
go worker(3, &wg)
wg.Wait()
fmt.Println("All workers are done.")
}
在上面的代码中,wg.Add(3)
表示我们有三个并发任务需要等待,而 wg.Wait()
则确保主协程等待所有工作者协程完成后再继续执行。
WaitGroup 的核心方法
Add 方法
Add
用于设置 WaitGroup 需要等待的操作数量。通常在启动协程前调用 Add
方法来指定需要等待的协程数量。
Done 方法
每当一个协程完成任务时,调用 Done
方法通知 WaitGroup 该协程已经完成任务。内部实现实际上是调用了 Add(-1)
,将等待的协程数量减一。
Wait 方法
Wait
方法会阻塞主协程,直到 WaitGroup 的计数值变为 0,确保所有并发任务完成后再继续执行主协程的逻辑。
WaitGroup 的实现原理
WaitGroup
结构体的核心字段为 state
和 sema
。state
使用了高 32 位记录子协程的计数,低 32 位记录等待协程的数量,而 sema
是用于协调并发的信号量。
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64
sema uint32
}
Add 方法源码解析
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
Add
方法的核心步骤包括更新 state
,校验 state
的合法性,最后当所有子协程都完成时,执行 V
操作,释放阻塞的协程。
WaitGroup 的注意事项
- 正确使用
Add
和Done
:确保在启动协程前调用Add
,并在协程完成任务时调用Done
。避免调用超量的Done
导致 panic。 - 合理复用
WaitGroup
:协程全部完成后,WaitGroup
可以复用,但要确保Wait
方法已经返回。 - 不要复制
WaitGroup
:避免复制WaitGroup
,否则会引发协程泄漏或死锁问题。
示例
并行计算
假设我们需要并行计算一个数组中每个元素的平方值:
package main
import (
"fmt"
"sync"
)
func square(wg *sync.WaitGroup, slice []int, result chan<- int) {
defer wg.Done()
for _, v := range slice {
result <- v * v
}
}
func main() {
var wg sync.WaitGroup
slice := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := make(chan int, len(slice))
wg.Add(3)
go square(&wg, slice[:3], result)
go square(&wg, slice[3:6], result)
go square(&wg, slice[6:], result)
wg.Wait()
close(result)
total := 0
for v := range result {
total += v
}
fmt.Println(total)
}
并发请求
当需要并发处理多个 RPC 请求时,可以使用 WaitGroup
来确保所有请求完成后返回响应。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
fmt.Println("call rpc1 ...")
}()
go func() {
defer wg.Done()
fmt.Println("call rpc2 ...")
}()
go func() {
defer wg.Done()
fmt.Println("call rpc3 ...")
}()
wg.Wait()
fmt.Println("所有请求处理完成")
}
总结
WaitGroup 是 Go 语言并发编程中的重要工具。它可以帮助开发者管理并发任务,确保协程正确完成并同步执行。通过深入理解它的原理和使用方法,可以编写高效、可靠的并发程序。