常见的并发模式实现(三):通过无缓冲通道创建协程池


上篇教程学院君给大家演示了如何通过缓冲通道实现共享资源池,今天,我们来看另一个并发模式的 Go 语言实现 —— 通过无缓冲通道实现协程(goroutine)池。

这些协程池通常用于并发执行一组任务,最终组合起来完成某个功能。在这种情况下,使用无缓冲通道要比使用缓冲通道好,因为既不需要任务队列,也不需要一组协程配合执行,并且方便知道什么时候协程池正在执行任务,如果协程池中的所有协程都在忙,无法处理新的任务,也能及时通过通道通知调用者(分配给无缓冲通道的任务未处理会阻塞后续分配)。另外,使用无缓冲通道不会有任务在队列中丢失或卡住,所有任务都会被处理。

创建一个 worker 目录,并在其中新建一个 worker.go 文件,根据上述思路,编写一段无缓冲通道创建协程池的示例代码如下:

package worker

import "sync"

type Worker interface {
    Task()
}

type Pool struct {
    work chan Worker
    wg sync.WaitGroup
}

func New(maxGoroutines int) *Pool {
    p := Pool{
        work: make(chan Worker),
    }

    p.wg.Add(maxGoroutines)
    for i := 0; i < maxGoroutines; i++ {
        go func() {
            for w := range p.work {
                w.Task()
            }
            p.wg.Done()
        }()
    }

    return &p
}

func (p *Pool) Run(w Worker)  {
    p.work <- w
}

func (p *Pool) Shutdown() {
    close(p.work)
    p.wg.Wait()
}

在上述代码中,我们定义了一个表示协程池的 Pool 结构体,其中 work 是一个无缓冲通道类型(该类型需要实现 Woker 接口),用于表示需要协程池执行的任务,wg 是一个 sync.WaitGroup 类型,用于控制协程池所有协程的执行和退出。

Pool 结构体定义了三个方法,分别是初始化协程池的 New 方法,分配任务给协程池的 Run 方法,以及关闭协程池的 Shutdown 方法,我们重点关注 New 方法。该方法接收一个 maxGoroutines 参数表示协程池中协程的最大数量,在初始化 Poolwork 属性时,没有指定缓冲值,表明其无缓冲通道类型:

p := Pool{
    work: make(chan Worker),
}

我们会让每个协程执行对应的任务方法(如果协程池有分配任务的话,否则会阻塞):

p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
    go func() {
        for w := range p.work {
            w.Task()
        }
        p.wg.Done()
    }()
}

注:我们可以通过协程池的 Run 方法将任务分配给协程池。

上述代码会异步执行,所以不影响 New 方法返回实例化后的 Pool 对象。

接下来,我们编写一段调用该协程池执行任务的入口程序 work.go

package main

import (
    "log"
    "sync"
    "test/worker"
    "time"
)

var langs = []string{
    "Golang",
    "PHP",
    "JavaScript",
    "Python",
    "Java",
}

type langPrinter struct {
    lang string
}

func (m *langPrinter) Task() {
    log.Println(m.lang)
    time.Sleep(time.Second)
}

func main() {
    p := worker.New(2)

    var wg sync.WaitGroup
    wg.Add(5 * len(langs))

    for i := 0; i < 5; i++ {
        for _, lang := range langs {
            lp := langPrinter{lang}
            go func() {
                p.Run(&lp)
                wg.Done()
            }()
        }
    }

    wg.Wait()
    p.Shutdown()
}

在这段代码中,我们定义了一个实现 Worker 接口的 langPrinter 类(实现 Task 方法),用于执行打印编程语言的任务(耗时1s)。然后我们在入口函数中,初始化协程池,指定其大小为 2,然后遍历 langs 切片,依次将基于 lang 值初始化的 langPrinter 对象作为任务实例分配给协程池去执行(从初始化时堵塞的地方开始执行),当然这个分配工作也是通过协程异步执行的,尽管每组可以分配 5 个任务,但是由于协程池中定义的是无缓冲通道,并且协程池的大小是 2,所以一次只能并发执行两个任务。所有分配给协程池的任务执行完成后,会关闭协程池,释放资源。

调用上述入口程序 work.go,输出结果如下:

-w723

通过上述输出,我们也可以验证每次协程池只能并发执行两个任务(每秒钟打印两个结果)。和缓冲通道调整缓冲值来调节并发能力不同,这里只能通过调整协程池大小来调节程序并发能力。

附:本篇教程示例代码目录结构如下:

--go (项目根目录 ~/Development/go)
  |--src
      |--test
          |--worker
              |--worker.go
          |--work.go
          |--go.mod

Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 常见的并发模式实现(二):通过缓冲通道实现共享资源池

>> 下一篇: 基于 Go 协程实现图片马赛克应用(上):同步版本