提供一个资源池,类似于数据库连接池的功能;资源池在 go 1.11.1 中有官方实现:sync/pool.go
一、资源池
package pool
import (
"sync"
"io"
"errors"
"log"
)
// 声明池类结构体
type Pool struct {
// 锁
lock sync.Mutex
// 池中存储的资源
resources chan io.Closer
// 资源创建工厂函数
factory func() (io.Closer, error)
// 池是否已经被关闭
closed bool
}
// 创建池类实例的工厂函数
// 工厂函数名通常使用 New 名字
func New(fn func() (io.Closer, error), size int) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size too small");
}
return &Pool{
resources: make(chan io.Closer, size),
factory: fn,
}, nil
}
// 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
// select - default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default
// 当然,如果没有 default,那么还是要阻塞在 <-p.resources 上的
select {
// 检查是否有空闲的资源
case r, ok := <-p.resources:
log.Println("Acquire:", "Shared Resource")
if !ok {
return nil, errors.New("pool already closed")
}
return r, nil
default:
log.Println("Acquire:", "New Resource")
// 调用资源创建函数创建资源
return p.factory()
}
}
// 将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
// 注意:Release 和 Close 使用的是同一把锁,就是说二者同时只能执行一个,防止资源池已经关闭了,release 还向资源池放资源
// 向一个已经关闭的 channel 发送消息,会发生 panic: send on closed channel
p.lock.Lock()
defer p.lock.Unlock()
// 如果池已经被关闭,销毁这个资源
if p.closed {
r.Close()
return
}
select {
// 试图将这个资源放入队列
case p.resources <- r:
log.Println("Release:", "In Queue")
default:
log.Println("Release:", "Closing")
r.Close()
}
}
// 关闭资源池,并关闭所有现有的资源
func (p *Pool) Close() {
p.lock.Lock()
defer p.lock.Unlock()
if p.closed {
return
}
p.closed = true
// 在清空通道里的资源之前,将通道关闭
close(p.resources)
// 关闭资源
for r := range p.resources {
r.Close()
}
}
select – default 经典模式,将阻塞形式的 channel 改为了
非阻塞
,当 <-p.resources 不能立即返回时,执行 default;当然,如果没有 default,那么还是要阻塞在 <-p.resources 上的
二、具体的资源类
package db
import (
"log"
"io"
"sync/atomic"
)
// 给每个连接分配一个独一无二的id
var idCounter int32
// 资源 - 数据库连接
type DBConnection struct {
ID int32
}
// dbConnection 实现了 io.Closer 接口
// 关闭资源
func (conn *DBConnection) Close() error {
log.Println("conn closed")
return nil
}
// 创建一个资源 - dbConnection
func CreateConn() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create conn, id:", id)
return &DBConnection{
ID: id,
}, nil
}
三、使用资源池
package main
import (
"sync"
"github.com/zhaojigang/pool/pool"
"github.com/zhaojigang/pool/db"
"log"
"time"
"math/rand"
)
const (
maxGoroutines = 5 // 要使用的goroutine的数量
pooledResources = 2 // 池中的资源的数量
)
func performQuery(query int, p *pool.Pool) {
// 1. 获取连接
conn, err := p.Acquire()
if err != nil {
log.Println("acquire conn error, ", err)
return
}
// 使用结束后,释放链接
defer p.Release(conn)
// 该 log 模拟对连接的使用
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*db.DBConnection).ID)
}
func main() {
var waitGroup sync.WaitGroup
waitGroup.Add(maxGoroutines)
// 1. 创建一个 Pool
p, err := pool.New(db.CreateConn, pooledResources)
if err != nil {
log.Println("create Pool error")
}
// 2. 开启 goroutine 执行任务
for query := 0; query < maxGoroutines; query++ {
// 每个goroutine需要自己复制一份要、查询值的副本,
// 不然所有的查询会共享同一个查询变量,即所有的 goroutine 最后的 query 值都是3
go func(q int) {
performQuery(q, p)
waitGroup.Done()
}(query)
//time.Sleep(1000*time.Millisecond) // 用于测试从 resources channel 中获取资源
}
// 3. 关闭连接池
waitGroup.Wait()
p.Close()
log.Println("pool closed - main")
}
在高并发的创建 goroutine 的情况下,从 pool.go # Acquire 方法中可以看到,大家可能都还没有 Release 资源,此时都会创建资源,资源在一瞬间会大量增加,在实际系统中,需要根据需求,做一些措施,例如提前创建好资源放入池中,goroutine 都从池中取资源,资源不够就等待,使用完之后就放入池中,防止资源意外关闭,还可以启用后台线程监控等。