Go 语言极速入门10 - 并发模式之资源池

提供一个资源池,类似于数据库连接池的功能;资源池在 go 1.11.1 中有官方实现:sync/pool.go

一、资源池

package pool

import (
    "sync"
    "io"
    "errors"
    "log"
)

// 声明池类结构体
type Pool struct {
    // 锁
    lock sync.Mutex
    // 池中存储的资源
    resources chan io.Closer
    // 资源创建工厂函数
    factory func() (io.Closer, error)
    // 池是否已经被关闭
    closed bool
}

// 创建池类实例的工厂函数
// 工厂函数名通常使用 New 名字
func New(fn func() (io.Closer, error), size int) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size too small");
    }

    return &Pool{
        resources: make(chan io.Closer, size),
        factory:   fn,
    }, nil
}

// 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
    // select - default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default
    // 当然,如果没有 default,那么还是要阻塞在 <-p.resources 上的
    select {
    // 检查是否有空闲的资源
    case r, ok := <-p.resources:
        log.Println("Acquire:", "Shared Resource")
        if !ok {
            return nil, errors.New("pool already closed")
        }
        return r, nil
    default:
        log.Println("Acquire:", "New Resource")
        // 调用资源创建函数创建资源
        return p.factory()
    }
}

// 将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
    // 注意:Release 和 Close 使用的是同一把锁,就是说二者同时只能执行一个,防止资源池已经关闭了,release 还向资源池放资源
    // 向一个已经关闭的 channel 发送消息,会发生 panic: send on closed channel
    p.lock.Lock()
    defer p.lock.Unlock()

    // 如果池已经被关闭,销毁这个资源
    if p.closed {
        r.Close()
        return
    }

    select {
    // 试图将这个资源放入队列
    case p.resources <- r:
        log.Println("Release:", "In Queue")
    default:
        log.Println("Release:", "Closing")
        r.Close()
    }
}

// 关闭资源池,并关闭所有现有的资源
func (p *Pool) Close() {
    p.lock.Lock()
    defer p.lock.Unlock()

    if p.closed {
        return
    }

    p.closed = true

    // 在清空通道里的资源之前,将通道关闭
    close(p.resources)

    // 关闭资源
    for r := range p.resources {
        r.Close()
    }
}

select – default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default;当然,如果没有 default,那么还是要阻塞在 <-p.resources 上的

二、具体的资源类

package db

import (
    "log"
    "io"
    "sync/atomic"
)

// 给每个连接分配一个独一无二的id
var idCounter int32

// 资源 - 数据库连接
type DBConnection struct {
    ID int32
}

// dbConnection 实现了 io.Closer 接口
// 关闭资源
func (conn *DBConnection) Close() error {
    log.Println("conn closed")
    return nil
}

// 创建一个资源 - dbConnection
func CreateConn() (io.Closer, error) {
    id := atomic.AddInt32(&idCounter, 1)
    log.Println("Create conn, id:", id)
    return &DBConnection{
        ID: id,
    }, nil
}

三、使用资源池

package main

import (
    "sync"
    "github.com/zhaojigang/pool/pool"
    "github.com/zhaojigang/pool/db"
    "log"
    "time"
    "math/rand"
)

const (
    maxGoroutines   = 5 // 要使用的goroutine的数量
    pooledResources = 2 // 池中的资源的数量
)

func performQuery(query int, p *pool.Pool) {
    // 1. 获取连接
    conn, err := p.Acquire()
    if err != nil {
        log.Println("acquire conn error, ", err)
        return
    }

    // 使用结束后,释放链接
    defer p.Release(conn)

    // 该 log 模拟对连接的使用
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("QID[%d] CID[%d]\n", query, conn.(*db.DBConnection).ID)
}

func main() {
    var waitGroup sync.WaitGroup
    waitGroup.Add(maxGoroutines)
    // 1. 创建一个 Pool
    p, err := pool.New(db.CreateConn, pooledResources)
    if err != nil {
        log.Println("create Pool error")
    }

    // 2. 开启 goroutine 执行任务
    for query := 0; query < maxGoroutines; query++ {
        // 每个goroutine需要自己复制一份要、查询值的副本,
        // 不然所有的查询会共享同一个查询变量,即所有的 goroutine 最后的 query 值都是3
        go func(q int) {
            performQuery(q, p)
            waitGroup.Done()
        }(query)
        //time.Sleep(1000*time.Millisecond) // 用于测试从 resources channel 中获取资源
    }

    // 3. 关闭连接池
    waitGroup.Wait()
    p.Close()
    log.Println("pool closed - main")
}

在高并发的创建 goroutine 的情况下,从 pool.go # Acquire 方法中可以看到,大家可能都还没有 Release 资源,此时都会创建资源,资源在一瞬间会大量增加,在实际系统中,需要根据需求,做一些措施,例如提前创建好资源放入池中,goroutine 都从池中取资源,资源不够就等待,使用完之后就放入池中,防止资源意外关闭,还可以启用后台线程监控等。

    原文作者:原水寒
    原文地址: https://www.jianshu.com/p/cee24568bbea
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞