常见的并发模式实现(二):通过缓冲通道实现共享资源池
今天这篇教程我们继续演示常见并发模式的 Go 语言实现 —— 通过缓冲通道(channel)实现共享资源池。
注:如果你不了解什么是通道和缓冲通道,参考这篇教程。
该资源池可用于管理任意数量的协程(goroutine)之间共享的资源(比如数据库连接),如果某个协程需要从资源池获取资源(比如从数据库连接池获取数据库连接),可以从共享资源池申请(如果没有的话需要新建),并且在使用完成后将其归还到共享资源池。
遵循这个思路,我们来编写对应的实现代码。
创建一个 pool
包,在其中新建一个 pool.go
文件,基于 Go 语言编写共享资源池实现代码如下:
package pool
import (
"errors"
"io"
"log"
"sync"
)
// 定义资源池结构体
type Pool struct {
// 通过锁机制确保资源池的并发安全
m sync.Mutex
// 通过缓冲通道管理资源池,资源池大小即缓冲值
resources chan io.Closer
// 在资源池中注册新的资源
factory func() (io.Closer, error)
// 标识资源池是否关闭
closed bool
}
var ErrPoolClosed = errors.New("资源池已关闭")
// 初始化资源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("资源池容量需要大于0")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
// 从资源池申请资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.resources:
// 资源池不为空则从中获取资源
log.Println("Acquire:", "共享资源")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
// 资源池为空则调用 p.factory() 方法注册新资源
log.Println("Acquire:", "新增资源")
return p.factory()
}
}
// 资源使用完成后释放
func (p *Pool) Release(r io.Closer) {
p.m.Lock()
defer p.m.Unlock()
// 资源池已关闭则支持释放资源
if p.closed {
r.Close()
return
}
// 否则将资源归还到资源池
select {
case p.resources <- r:
log.Println("Release:", "In Queue")
default:
log.Println("Release:", "Closing")
r.Close()
}
}
// 关闭资源池
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
// 释放资源池和资源池中所有资源
p.closed = true
close(p.resources)
for r := range p.resources {
r.Close()
}
}
上述代码定义了一个 Pool
结构体,用来创建共享资源池,每个资源池都可以管理任意类型的资源,只要对应的资源类型实现了 io.Closer
接口即可。其中 factory
属性是一个函数类型,需要调用方定义并传入,用于定义如何注册新资源到资源池。另外,资源池通常有容量(资源池可容纳的资源数量),这个容量也需要调用方初始化资源池时传入(我们可以通过 New
方法看到这一点),由于资源池 resources
是通道类型,因此通道的缓冲值大小即资源池容量。最后,我们还在 Pool
中定义了一个 sync.Mutex 锁,用于在对资源池进行操作时保证并发安全(同时可能有多个协程对资源池进行操作)。
接下来,在 Pool
中定义了初始化资源池、从资源池获取资源、释放资源以及关闭资源池四个方法,具体细节已经通过注释进行说明了,这里不再一一阐释。
至此,我们已经完成了通过缓冲通道实现共享资源池的代码编写,可以编写一段业务代码 db_pool.go
对其进行调用:
package main
import (
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"test/pool"
"time"
)
const (
maxGoroutines = 5
pooledResources = 2
)
type dbConnection struct {
ID int32
}
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
var idCounter int32
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
p, err := pool.New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
for query := 0; query < maxGoroutines; query++ {
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
wg.Wait()
log.Println("Shutdown Program.")
p.Close()
}
func performQueries(query int, p *pool.Pool) {
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
defer p.Release(conn)
// 模拟 SQL 查询
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}
在这段调用代码中(主要关注 main
方法),我们演示的是一个数据库连接池,通过 sync.WaitGroup 将最大协程数设置为 5,在初始化共享资源池时,将资源池的大小设置为 2,Pool.factory
属性对应的是这里的创建数据库连接方法 createConnection
,该方法会返回一个数据库连接对象作为资源注册到资源池(数据库连接对象 dbConnection
是一个模拟的伪数据库连接,通过定义 Close
方法实现了 io.Closer
接口)。
接下来,我们通过多个协程(goroutine)并发调用 performQueries
方法执行数据库查询(依然是伪实现),在这个方法中,包含了从资源池申请资源,以及查询完成后将对应资源归还
给资源池的操作。这样一来,我们就可以模拟这篇教程开头设想的场景:多个协程共享资源池中的资源。
执行这段代码,输出结果如下:
由于这里协程数量较少,尚未等到资源被释放回资源池,就已经完成所有资源获取工作,所以所有资源都是通过调用 pool.factory()
对应方法新建的,如果调大协程数量(maxGoroutines
常量值),降低 SQL 查询时间数量级(10ms-100ms),则输出日志里就会出现通过资源池获取共享资源的情况(Acquire: Shared Resource
),感兴趣的同学可以去试试看。
附:本篇教程示例代码结构和上篇教程类似:
--go (项目根目录 ~/Development/go)
|--src
|--test
|--pool
|--pool.go
|--db_pool.go
|--go.mod
3 Comments
有个问题不明白想请教学院君:资源池-比如这里模拟的数据库连接池,一般应该是为了解决如MySQL之类的数据库出现
too many connections
的问题而出现的吧;那在这篇教程的例子中,我改了maxGoroutines
为1000
,SQL查询时间为time.Duration(rand.Intn(100)) * time.Millisecond
,执行下来的打印结果显示Create: New Connection 998
,而共享资源的打印次数非常之少,那是不是就意味着建立了998次数据库连接(尽管有可能部分连接已经释放掉了)?那是不是没有起到连接池的功效啊,连接池不应该是一直维护pooledResources
个连接不断开,而后各个协程需要使用的时候来连接池获取,用完再放回连接池,避免出现too many connetctions
的问题吗?是的 你可以通过 pooledResources 适当调大资源池的数量来提高命中率
去协程池取资源的取不到的时候,不去创建新的资源,阻塞等待着应该就可以?