提供一个 goroutine 池,每个 goroutine 循环阻塞等待从任务池中执行任务;外界使用者不断的往任务池里丢任务,则 goroutine 池中的多个 goroutine 会并发的处理这些任务
一、worker/workPool.go
import "sync"
type Worker interface {
Task()
}
type Pool struct {
wg sync.WaitGroup
// 工作池
taskPool chan Worker
}
func New(maxGoroutineNum int) *Pool {
// 1. 初始化一个 Pool
p := Pool{
taskPool: make(chan Worker),
}
p.wg.Add(maxGoroutineNum)
// 2. 创建 maxGoroutineNum 个 goroutine,并发的从 taskPool 中获取任务
for i := 0; i < maxGoroutineNum; i++ {
go func() {
for task := range p.taskPool { // 阻塞获取,一旦没有任务,阻塞在这里 - 无缓冲 channel
// 3. 执行任务
task.Task()
}
p.wg.Done()
}()
}
return &p
}
// 提交任务到worker池中
func (p *Pool) Run(worker Worker) {
p.taskPool <- worker
}
func (p *Pool) Shutdown() {
// 关闭通道
close(p.taskPool)
p.wg.Wait()
}
二、namePrinter/namePrinter.go
import (
"fmt"
"time"
)
type NamePrinter struct {
Name string
}
func (np *NamePrinter) Task() {
fmt.Println(np.Name)
time.Sleep(time.Second)
}
三、main.go
import (
"github.com/zhaojigang/worker/worker"
"sync"
"github.com/zhaojigang/worker/namePrinter"
)
var names = []string{
"steve",
"bob",
}
func main() {
// 1. 启动两个 goroutine,等待执行任务
p := worker.New(2)
var wg sync.WaitGroup
wg.Add(3 * len(names))
// 2. 创建 worker,扔到 goroutine 池中
for i := 0; i < 3; i++ {
for _, namex := range names {
worker := namePrinter.NamePrinter{
Name:namex,
}
go func() {
p.Run(&worker)
wg.Done()
}()
}
}
// 3. 等待添加任务完毕
wg.Wait()
// 4. 关闭 goroutine 池
p.Shutdown()
}