常见的并发模式实现(三):通过无缓冲通道创建协程池
上篇教程学院君给大家演示了如何通过缓冲通道实现共享资源池,今天,我们来看另一个并发模式的 Go 语言实现 —— 通过无缓冲通道实现协程(goroutine)池。
这些协程池通常用于并发执行一组任务,最终组合起来完成某个功能。在这种情况下,使用无缓冲通道要比使用缓冲通道好,因为既不需要任务队列,也不需要一组协程配合执行,并且方便知道什么时候协程池正在执行任务,如果协程池中的所有协程都在忙,无法处理新的任务,也能及时通过通道通知调用者(分配给无缓冲通道的任务未处理会阻塞后续分配)。另外,使用无缓冲通道不会有任务在队列中丢失或卡住,所有任务都会被处理。
创建一个 worker
目录,并在其中新建一个 worker.go
文件,根据上述思路,编写一段无缓冲通道创建协程池的示例代码如下:
package worker
import "sync"
type Worker interface {
Task()
}
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return &p
}
func (p *Pool) Run(w Worker) {
p.work <- w
}
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
在上述代码中,我们定义了一个表示协程池的 Pool
结构体,其中 work
是一个无缓冲通道类型(该类型需要实现 Woker
接口),用于表示需要协程池执行的任务,wg
是一个 sync.WaitGroup
类型,用于控制协程池所有协程的执行和退出。
该 Pool
结构体定义了三个方法,分别是初始化协程池的 New
方法,分配任务给协程池的 Run
方法,以及关闭协程池的 Shutdown
方法,我们重点关注 New
方法。该方法接收一个 maxGoroutines
参数表示协程池中协程的最大数量,在初始化 Pool
的 work
属性时,没有指定缓冲值,表明其无缓冲通道类型:
p := Pool{
work: make(chan Worker),
}
我们会让每个协程执行对应的任务方法(如果协程池有分配任务的话,否则会阻塞):
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
注:我们可以通过协程池的
Run
方法将任务分配给协程池。
上述代码会异步执行,所以不影响 New
方法返回实例化后的 Pool
对象。
接下来,我们编写一段调用该协程池执行任务的入口程序 work.go
:
package main
import (
"log"
"sync"
"test/worker"
"time"
)
var langs = []string{
"Golang",
"PHP",
"JavaScript",
"Python",
"Java",
}
type langPrinter struct {
lang string
}
func (m *langPrinter) Task() {
log.Println(m.lang)
time.Sleep(time.Second)
}
func main() {
p := worker.New(2)
var wg sync.WaitGroup
wg.Add(5 * len(langs))
for i := 0; i < 5; i++ {
for _, lang := range langs {
lp := langPrinter{lang}
go func() {
p.Run(&lp)
wg.Done()
}()
}
}
wg.Wait()
p.Shutdown()
}
在这段代码中,我们定义了一个实现 Worker
接口的 langPrinter
类(实现 Task
方法),用于执行打印编程语言的任务(耗时1s)。然后我们在入口函数中,初始化协程池,指定其大小为 2,然后遍历 langs
切片,依次将基于 lang
值初始化的 langPrinter
对象作为任务实例分配给协程池去执行(从初始化时堵塞的地方开始执行),当然这个分配工作也是通过协程异步执行的,尽管每组可以分配 5 个任务,但是由于协程池中定义的是无缓冲通道,并且协程池的大小是 2,所以一次只能并发执行两个任务。所有分配给协程池的任务执行完成后,会关闭协程池,释放资源。
调用上述入口程序 work.go
,输出结果如下:
通过上述输出,我们也可以验证每次协程池只能并发执行两个任务(每秒钟打印两个结果)。和缓冲通道调整缓冲值来调节并发能力不同,这里只能通过调整协程池大小来调节程序并发能力。
附:本篇教程示例代码目录结构如下:
--go (项目根目录 ~/Development/go)
|--src
|--test
|--worker
|--worker.go
|--work.go
|--go.mod
No Comments