主从设计模式的Go实现

在流水线设计模式之外,主从模式(Boss-worker)也是一种重要的多线程设计模式。在主从模式中,存在一个主人线程(Boss),它负责将工作分成同样的几份,并分配给从线程(Worker),Worker各自分头完成工作,最后Boss负责将多个Worker线程的工作成果合并,下面用Go来演示一下这种设计模式。

演示用的例子实现的是类似于Linux中grep的功能,搜索文本中匹配的字符,并列出匹配的行。文本搜索的目标可能是多个文件,对每个文件的搜索是独立的,因此可以利用主从模式提高在多核CPU上提高多文件搜索的效率。

首先定义数据结构,Worker线程的数目取决于CPU核的数目,因为文件的搜索涉及文件I/O和大量内存计算,Worker线程的数目超过CPU核数时会带来线程切换的额外负担,而对提高搜索效率没有效果。

Result结构定义了搜索结果,文件名-匹配行数-匹配行内容。Job结构用于Boss线程和Worker线程的通信,搜索文件-搜索结果。

var workers = runtime.NumCPU() //number of workers

type Result struct {
    filename string //file name
    lino     int    //line number
    line     string //string content of line
}

type Job struct {
    filename string        //the name of the file on procesing
    results  chan<- Result //channel that any result to be sent
}

首先设置Go会使用的CPU核数,然后解析命令行参数,获得 搜索超时,匹配用的正则表达式和搜索目标文件。正则表达式在编译后获得一个表达式指针,这个指针会被Worker线程共享使用。通常,使用共享指针不是线程安全的,但是*regexp.Regexp在Go文档中已经被声明为线程安全了,因此可以放心使用。最后,调用grep()开始工作。

    runtime.GOMAXPROCS(runtime.NumCPU()) // Use all the machine's cores
    ...
    //Compile the input regex
    // lineRx is a shared pointer to value, which shall be a cause of
    //  concern since it's not thread safe, but Go doc *regexp.Regexp is
    //  safe to be shared in as many routines
    if lineRx, err := regexp.Compile(pattern); err != nil {
        log.Fatalf("invalid regexp: %s\n", err)
    } else {
        var timeout int64 = 1e9 * 60 * 10 // 10 minutes!
        if *timeoutOpt != 0 {
            timeout = *timeoutOpt * 1e9
        }
        grep(timeout, lineRx, commandLineFiles(files))
    }

在grep()中,创建三个双向的channel,jobs用于Boss线程分配工作给Worker, results用于Worker线程汇报搜索结果,done中是标志结束的channel。results channel设置了最长1000的缓冲区,当缓冲区满,而Worker线程需要向results中添加数据时, Worker线程会被阻塞直到results的数据被处理缓冲区有空余。

func grep(timeout int64, lineRx *regexp.Regexp, filenames []string) {
    //create channels
    jobs := make(chan Job, workers)
    results := make(chan Result, minimum(1000, len(filenames)))
    done := make(chan struct{}, workers)

    go addJobs(jobs, filenames, results) //boss assign jobs
    for i := 0; i < workers; i++ {
        go doJobs(done, lineRx, jobs) //worker do jobs
    }
    //wait for work to submit work result
    waitAndProcessResults(timeout, done, results)
}

addJobs()将文件名和result channel发送到job channel。doJobs()接受job channel和正则表达式,进行搜索。所有的工作完成后,通过done channel发送完成标志。

func addJobs(jobs chan<- Job, filenames []string, results chan<- Result) {
    for _, filename := range filenames {
        jobs <- Job{filename, results}
    }
    close(jobs)
}

func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
    for job := range jobs {
        job.Do(lineRx)
    }
    done <- struct{}{}
}

搜索的实现按下不表, waitAndProcessResults()等待Worker线程完成并打印结果。select会阻塞Boss线程直到接收到result, finish 或者 done channel 数据。阻塞的Boss线程会睡眠,从而不会消耗CPU资源来死等。每次收到一个done, 表明一个Worker线程的工作完成,当所有的Worker线程都完成后,Boss线程不需要阻塞,可以顺畅地打印所有的结果。

func waitAndProcessResults(timeout int64, done <-chan struct{},
    results <-chan Result) {
    finish := time.After(time.Duration(timeout))
    for working := workers; working > 0; {
        select { // Blocking
        case result := <-results:
            fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                result.line)
        case <-finish:
            fmt.Println("timed out")
            return // Time's up so finish with what results there were
        case <-done:
            working--
        }
    }
    for {
        select { // Nonblocking
        case result := <-results:
            fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                result.line)
        case <-finish:
            fmt.Println("timed out")
            return // Time's up so finish with what results there were
        default:
            return
        }
    }
}

搜索的具体实现对于本文的主题没有太大的意义,因此做简要说明,将文件全部读入缓存后,按行检测匹配。

func (job Job) Do(lineRx *regexp.Regexp) {
    file, err := os.Open(job.filename)
    if err != nil {
        log.Printf("error: %s\n", err)
        return
    }
    defer file.Close()
    reader := bufio.NewReader(file)
    for lino := 1; ; lino++ {
        line, err := reader.ReadBytes('\n')
        line = bytes.TrimRight(line, "\n\r")
        if lineRx.Match(line) {
            job.results <- Result{job.filename, lino, string(line)}
        }
        if err != nil {
            if err != io.EOF {
                log.Printf("error:%d: %s\n", lino, err)
            }
            break
        }
    }
}

最后验证一下程序。

./cgrep runtime.GOOS cgrep.go
cgrep.go:90:    if runtime.GOOS == "windows" {

代码清单:[https://github.com/KevinACoder/gobook/blob/master/src/cgrep3/cgrep.go]

    原文作者:山中散人的博客
    原文地址: https://www.jianshu.com/p/07ebfb6c3358
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞