在流水线设计模式之外,主从模式(Boss-worker)也是一种重要的多线程设计模式。在主从模式中,存在一个主人线程(Boss),它负责将工作分成同样的几份,并分配给从线程(Worker),Worker各自分头完成工作,最后Boss负责将多个Worker线程的工作成果合并,下面用Go来演示一下这种设计模式。
演示用的例子实现的是类似于Linux中grep的功能,搜索文本中匹配的字符,并列出匹配的行。文本搜索的目标可能是多个文件,对每个文件的搜索是独立的,因此可以利用主从模式提高在多核CPU上提高多文件搜索的效率。
首先定义数据结构,Worker线程的数目取决于CPU核的数目,因为文件的搜索涉及文件I/O和大量内存计算,Worker线程的数目超过CPU核数时会带来线程切换的额外负担,而对提高搜索效率没有效果。
Result结构定义了搜索结果,文件名-匹配行数-匹配行内容。Job结构用于Boss线程和Worker线程的通信,搜索文件-搜索结果。
var workers = runtime.NumCPU() //number of workers
type Result struct {
filename string //file name
lino int //line number
line string //string content of line
}
type Job struct {
filename string //the name of the file on procesing
results chan<- Result //channel that any result to be sent
}
首先设置Go会使用的CPU核数,然后解析命令行参数,获得 搜索超时,匹配用的正则表达式和搜索目标文件。正则表达式在编译后获得一个表达式指针,这个指针会被Worker线程共享使用。通常,使用共享指针不是线程安全的,但是*regexp.Regexp在Go文档中已经被声明为线程安全了,因此可以放心使用。最后,调用grep()开始工作。
runtime.GOMAXPROCS(runtime.NumCPU()) // Use all the machine's cores
...
//Compile the input regex
// lineRx is a shared pointer to value, which shall be a cause of
// concern since it's not thread safe, but Go doc *regexp.Regexp is
// safe to be shared in as many routines
if lineRx, err := regexp.Compile(pattern); err != nil {
log.Fatalf("invalid regexp: %s\n", err)
} else {
var timeout int64 = 1e9 * 60 * 10 // 10 minutes!
if *timeoutOpt != 0 {
timeout = *timeoutOpt * 1e9
}
grep(timeout, lineRx, commandLineFiles(files))
}
在grep()中,创建三个双向的channel,jobs用于Boss线程分配工作给Worker, results用于Worker线程汇报搜索结果,done中是标志结束的channel。results channel设置了最长1000的缓冲区,当缓冲区满,而Worker线程需要向results中添加数据时, Worker线程会被阻塞直到results的数据被处理缓冲区有空余。
func grep(timeout int64, lineRx *regexp.Regexp, filenames []string) {
//create channels
jobs := make(chan Job, workers)
results := make(chan Result, minimum(1000, len(filenames)))
done := make(chan struct{}, workers)
go addJobs(jobs, filenames, results) //boss assign jobs
for i := 0; i < workers; i++ {
go doJobs(done, lineRx, jobs) //worker do jobs
}
//wait for work to submit work result
waitAndProcessResults(timeout, done, results)
}
addJobs()将文件名和result channel发送到job channel。doJobs()接受job channel和正则表达式,进行搜索。所有的工作完成后,通过done channel发送完成标志。
func addJobs(jobs chan<- Job, filenames []string, results chan<- Result) {
for _, filename := range filenames {
jobs <- Job{filename, results}
}
close(jobs)
}
func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
for job := range jobs {
job.Do(lineRx)
}
done <- struct{}{}
}
搜索的实现按下不表, waitAndProcessResults()等待Worker线程完成并打印结果。select会阻塞Boss线程直到接收到result, finish 或者 done channel 数据。阻塞的Boss线程会睡眠,从而不会消耗CPU资源来死等。每次收到一个done, 表明一个Worker线程的工作完成,当所有的Worker线程都完成后,Boss线程不需要阻塞,可以顺畅地打印所有的结果。
func waitAndProcessResults(timeout int64, done <-chan struct{},
results <-chan Result) {
finish := time.After(time.Duration(timeout))
for working := workers; working > 0; {
select { // Blocking
case result := <-results:
fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
result.line)
case <-finish:
fmt.Println("timed out")
return // Time's up so finish with what results there were
case <-done:
working--
}
}
for {
select { // Nonblocking
case result := <-results:
fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
result.line)
case <-finish:
fmt.Println("timed out")
return // Time's up so finish with what results there were
default:
return
}
}
}
搜索的具体实现对于本文的主题没有太大的意义,因此做简要说明,将文件全部读入缓存后,按行检测匹配。
func (job Job) Do(lineRx *regexp.Regexp) {
file, err := os.Open(job.filename)
if err != nil {
log.Printf("error: %s\n", err)
return
}
defer file.Close()
reader := bufio.NewReader(file)
for lino := 1; ; lino++ {
line, err := reader.ReadBytes('\n')
line = bytes.TrimRight(line, "\n\r")
if lineRx.Match(line) {
job.results <- Result{job.filename, lino, string(line)}
}
if err != nil {
if err != io.EOF {
log.Printf("error:%d: %s\n", lino, err)
}
break
}
}
}
最后验证一下程序。
./cgrep runtime.GOOS cgrep.go
cgrep.go:90: if runtime.GOOS == "windows" {
代码清单:[https://github.com/KevinACoder/gobook/blob/master/src/cgrep3/cgrep.go]