常见的并发模式实现(二):通过缓冲通道实现共享资源池


今天这篇教程我们继续演示常见并发模式的 Go 语言实现 —— 通过缓冲通道(channel)实现共享资源池。

注:如果你不了解什么是通道和缓冲通道,参考这篇教程

该资源池可用于管理任意数量的协程(goroutine)之间共享的资源(比如数据库连接),如果某个协程需要从资源池获取资源(比如从数据库连接池获取数据库连接),可以从共享资源池申请(如果没有的话需要新建),并且在使用完成后将其归还到共享资源池。

遵循这个思路,我们来编写对应的实现代码。

创建一个 pool 包,在其中新建一个 pool.go 文件,基于 Go 语言编写共享资源池实现代码如下:

package pool

import (
    "errors"
    "io"
    "log"
    "sync"
)

// 定义资源池结构体
type Pool struct {
    // 通过锁机制确保资源池的并发安全
    m sync.Mutex
    // 通过缓冲通道管理资源池,资源池大小即缓冲值
    resources chan io.Closer
    // 在资源池中注册新的资源
    factory func() (io.Closer, error)
    // 标识资源池是否关闭
    closed bool
}

var ErrPoolClosed = errors.New("资源池已关闭")

// 初始化资源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("资源池容量需要大于0")
    }

    return &Pool{
        factory:   fn,
        resources: make(chan io.Closer, size),
    }, nil
}

// 从资源池申请资源
func (p *Pool) Acquire() (io.Closer, error) {
    select {
    case r, ok := <-p.resources:
        // 资源池不为空则从中获取资源
        log.Println("Acquire:", "共享资源")
        if !ok {
            return nil, ErrPoolClosed
        }
        return r, nil
    default:
        // 资源池为空则调用 p.factory() 方法注册新资源
        log.Println("Acquire:", "新增资源")
        return p.factory()
    }
}

// 资源使用完成后释放
func (p *Pool) Release(r io.Closer) {
    p.m.Lock()
    defer p.m.Unlock()

    // 资源池已关闭则支持释放资源
    if p.closed {
        r.Close()
        return
    }

    // 否则将资源归还到资源池
    select {
    case p.resources <- r:
        log.Println("Release:", "In Queue")
    default:
        log.Println("Release:", "Closing")
        r.Close()
    }
}

// 关闭资源池
func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()

    if p.closed {
        return
    }

    // 释放资源池和资源池中所有资源
    p.closed = true
    close(p.resources)
    for r := range p.resources {
        r.Close()
    }
}

上述代码定义了一个 Pool 结构体,用来创建共享资源池,每个资源池都可以管理任意类型的资源,只要对应的资源类型实现了 io.Closer 接口即可。其中 factory 属性是一个函数类型,需要调用方定义并传入,用于定义如何注册新资源到资源池。另外,资源池通常有容量(资源池可容纳的资源数量),这个容量也需要调用方初始化资源池时传入(我们可以通过 New 方法看到这一点),由于资源池 resources 是通道类型,因此通道的缓冲值大小即资源池容量。最后,我们还在 Pool 中定义了一个 sync.Mutex 锁,用于在对资源池进行操作时保证并发安全(同时可能有多个协程对资源池进行操作)。

接下来,在 Pool 中定义了初始化资源池、从资源池获取资源、释放资源以及关闭资源池四个方法,具体细节已经通过注释进行说明了,这里不再一一阐释。

至此,我们已经完成了通过缓冲通道实现共享资源池的代码编写,可以编写一段业务代码 db_pool.go 对其进行调用:

package main

import (
    "io"
    "log"
    "math/rand"
    "sync"
    "sync/atomic"
    "test/pool"
    "time"
)

const (
    maxGoroutines = 5
    pooledResources = 2
)

type dbConnection struct {
    ID int32
}

func (dbConn *dbConnection) Close() error {
    log.Println("Close: Connection", dbConn.ID)
    return nil
}

var idCounter int32

func createConnection() (io.Closer, error)  {
    id := atomic.AddInt32(&idCounter, 1)
    log.Println("Create: New Connection", id)

    return &dbConnection{id}, nil
}

func main()  {
    var wg sync.WaitGroup
    wg.Add(maxGoroutines)

    p, err := pool.New(createConnection, pooledResources)
    if err != nil {
        log.Println(err)
    }

    for query := 0; query < maxGoroutines; query++ {
        go func(q int) {
            performQueries(q, p)
            wg.Done()
        }(query)
    }

    wg.Wait()

    log.Println("Shutdown Program.")
    p.Close()
}

func performQueries(query int, p *pool.Pool) {
    conn, err := p.Acquire()
    if err != nil {
        log.Println(err)
        return
    }
    defer p.Release(conn)

    // 模拟 SQL 查询
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}

在这段调用代码中(主要关注 main 方法),我们演示的是一个数据库连接池,通过 sync.WaitGroup 将最大协程数设置为 5,在初始化共享资源池时,将资源池的大小设置为 2,Pool.factory 属性对应的是这里的创建数据库连接方法 createConnection,该方法会返回一个数据库连接对象作为资源注册到资源池(数据库连接对象 dbConnection 是一个模拟的伪数据库连接,通过定义 Close 方法实现了 io.Closer 接口)。

接下来,我们通过多个协程(goroutine)并发调用 performQueries 方法执行数据库查询(依然是伪实现),在这个方法中,包含了从资源池申请资源,以及查询完成后将对应资源归还 给资源池的操作。这样一来,我们就可以模拟这篇教程开头设想的场景:多个协程共享资源池中的资源。

执行这段代码,输出结果如下:

-w720

由于这里协程数量较少,尚未等到资源被释放回资源池,就已经完成所有资源获取工作,所以所有资源都是通过调用 pool.factory() 对应方法新建的,如果调大协程数量(maxGoroutines 常量值),降低 SQL 查询时间数量级(10ms-100ms),则输出日志里就会出现通过资源池获取共享资源的情况(Acquire: Shared Resource),感兴趣的同学可以去试试看。

附:本篇教程示例代码结构和上篇教程类似:

--go (项目根目录 ~/Development/go)
  |--src
      |--test
          |--pool
              |--pool.go
          |--db_pool.go
          |--go.mod

Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 常见的并发模式实现(一):调度后台处理任务的作业程序

>> 下一篇: 常见的并发模式实现(三):通过无缓冲通道创建协程池