【Go】用ErrorGroup替代你的WaitGroup
简介
sync.WaitGroup在并发编程里面使用频率非常高,通常用于协同等待的场景,学习基础的Go并发编程的同学应该没有不懂它的使用方法。
WaitGroup简单介绍
看看下面这个代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19func main() { cost := time.Now() for i := 0; i < 5; i++ { go func(i int) { fmt.Println(i) }(i) } // time.Sleep 具体什么作用呢? time.Sleep(time.Second * 1) fmt.Printf("cost:%v ms", time.Since(cost).Milliseconds()) } 0 4 1 2 3 cost:1004 ms我相信大家应该很清楚,
time.Sleep()在这边的作用。同样的sync.WaitGroup也是起到等待完成的作用。一个
goroutine在检查点(Check Point)等待一组执行任务的 workergoroutine全部完成,如果在执行任务的这些workergoroutine还没全部完成,等待的goroutine就会阻塞在检查点,直到所有wokergoroutine都完成后才能继续执行。改改代码使用
sync.WaitGroup的方式1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22func main() { cost := time.Now() wg := sync.WaitGroup{} for i := 0; i < 5; i++ { wg.Add(1) go func(i int) { defer wg.Done() fmt.Println(i) }(i) } wg.Wait() fmt.Printf("cost:%v ms", time.Since(cost).Milliseconds()) } // 输出 4 1 0 2 3 cost:0 ms
为什么两个程序时间差距这么大,你应该明白了,原因在于怎么实现等的操作。怎么保证可靠性的等待。
WaitGroup存在的问题
如果在woker goroutine的执行过程中遇到错误想要通知在检查点等待的协程处理该怎么办呢?WaitGroup并没有提供传播错误的功能。
ErrorGroup介绍
Go语言在扩展库提供的ErrorGroup并发原语正好适合在这种场景下使用,它在WaitGroup的功能基础上还提供了,错误传播以及上下文取消的功能。
Go扩展库通过errorgroup.Group提供ErrorGroup原语的功能,它有三个方法可调用:
| |
- 调用
errorgroup包的WithContext方法会返回一个Group实例,同时还会返回一个使用context.WithCancel生成的新Context。一旦有一个子任务返回错误,或者是Wait调用返回,这个新Context就会被cancel。 Go方法,接收类型为func() error的函数作为子任务函数,如果任务执行成功,就返回nil,否则就返回error,并且会cancel那个新的Context。Wait方法,类似WaitGroup的Wait方法,调用后会阻塞地等待所有的子任务都完成,它才会返回。如果有多个子任务返回错误,它只会返回第一个出现的错误,如果所有的子任务都执行成功,就返回nil。
使用ErrorGroup
go get -u
接下来我们让主goroutine使用ErrorGroup代替WaitGroup等待所有子任务的完成,ErrorGroup有一个特点是会返回所有执行任务的goroutine遇到的第一个错误。我们试着执行一下下面的程序,注意观察程序的输出。
| |
上面程序,遇到i大于90的都会产生错误结束执行,但是只有第一个执行时产生的错误被ErrorGroup返回,程序的输出大概如下:
| |
最早执行遇到错误的goroutine输出了Error: 98但是所有未执行完的其他任务并没有停止执行。那么想让程序遇到错误就终止其他子任务该怎么办呢?我们可以用errgroup.Group提供的WithContext方法创建一个带可取消上下文功能的ErrorGroup。
| |
Go方法单独开启的goroutine在执行参数传递进来的函数时,如果函数返回了错误,会对ErrorGroup持有的err字段进行赋值并及时调用cancel函数,通过上下文通知其他子任务取消执行任务。所以上面更新后的程序会有如下类似的输出。
| |
了解ErrorGroup的使用方法后,我们再来看看这个并发同步原语的实现原理。
ErrorGroup的实现原理
ErrorGroup原语的结构体类型errorgroup.Group定义如下:
| |
cancel— 创建context.Context时返回的取消函数,用于在多个goroutine之间同步取消信号;wg— 用于等待一组goroutine完成子任务的同步原语;errOnce— 用于保证只接收一个子任务返回的错误的同步原语;
通过 errgroup.WithContext构造器创建errgroup.Group 结构体:
| |
运行新的并行子任务需要使用errgroup.Group.Go方法,这个方法的执行过程如下:
- 调用
sync.WaitGroup.Add增加待处理的任务数; - 创建一个新的
goroutine并在goroutine内部运行子任务; - 返回错误时及时调用
cancel并对err赋值,只有最早返回的错误才会被上游感知到,后续的错误都会被舍弃:
| |
用于等待的errgroup.Group.Wait方法只是调用了 sync.WaitGroup.Wait方法,阻塞地等待所有子任务完成。在子任务全部完成时会通过调用在errorgroup.WithContext创建Group和Context对象时存放在Group.cancel字段里的函数,取消Context对象并返回可能出现的错误。
| |
总结
Go语言通过errorgroup.Group结构提供的ErrorGroup原语通过封装WaitGroup、Once基本原语结合上下文对象,提供了除同步等待外更加复杂的错误传播和执行任务取消的功能。在使用时,我们也需要注意它的两个特点:
errgroup.Group在出现错误或者等待结束后都会调用Context对象 的cancel方法同步取消信号。- 只有第一个出现的错误才会被返回,剩余的错误都会被直接抛弃。