Go 利用chan嵌套chan 实现函数异步执行 顺序返回值

遇到的问题

异步对于绝大多数的开发而言并不陌生,在go语言中异步的实现变得异常方便。只要在执行的方法前加一个go关键字就可以实现异步操作。但是如果需求是,按照调用的先后顺序(FIFO)来返回值我们应该怎么办。大家都知道,一系列的方法调用如果使用了异步执行那么就并不能保证返回的先后顺序,返回的先后顺序取决于每个函数耗时的长短,耗时短的则会先返回。当然解决这个问题的办法有很多,在最近看的一本书中发现了chan嵌套chan可以很巧妙的实现这个需求。

没解决之前

先看一下没有使用嵌套chan的情况。
代码很简单,方法operation1 内部sleep 1秒 方法operation2 内部sleep 2秒。5次调用都在goroutine中执行,结果可以看到 5个方法大约耗时2秒。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {

    resultCh := make(chan string)
    //开一个gotoutine 接受所有返回值并打印
    go replay(resultCh)
    //使用waitgroup 等待一下所有gorountie执行完毕,记录时间
    wg := sync.WaitGroup{}

    startTime := time.Now()

    //operation1内部sleep 1秒
    //operation2内部sleep 2秒
    //如果是同步执行下列调用需要 8秒左右
    //目前用异步调用 理论上只需要2秒
    //但于丹的问题是 不能实现先进先出的需求
    operation2(resultCh, "aaa", &wg)
    operation2(resultCh, "bbb", &wg)
    operation1(resultCh, "ccc", &wg)
    operation1(resultCh, "ddd", &wg)
    operation2(resultCh, "eee", &wg)
    wg.Wait()
    endTime := time.Now()
    fmt.Printf("Process time %s", endTime.Sub(startTime))
}

func replay(resultCh chan string)(){
    for{
        fmt.Println(<-resultCh)
    }
}

func operation1(resultCh chan string, str string, wg *sync.WaitGroup)(){
    wg.Add(1)
    go func(str string,wg *sync.WaitGroup){
        time.Sleep(time.Second*1)
        resultCh <- "operation1:"+str
        wg.Done()
    }(str,wg)
}

func operation2(resultCh chan string, str string, wg *sync.WaitGroup)(){
    wg.Add(1)
    go func(str string,wg *sync.WaitGroup){
        time.Sleep(time.Second*2)
        resultCh <- "operation2:"+str
        wg.Done()
    }(str,wg)
}

结果:
执行结果虽然是很理想,执行5个方法只用了2秒。但是违背了需求的先进先出(FIFO)的规则。返回顺序完全是根据函数耗时长短来决定。

operation1:ddd 
operation1:ccc 
operation2:aaa 
operation2:eee 
operation2:bbb
Process time 2.002555639s

如何解决

创建一个嵌套chan,chan中的值也是一个chan,在执行的时候按照先后顺序添加。在replay就会按照先进先出的顺序读取,利用chan阻塞等待第一个完成再执行下一个chan的值。
那么这样执行的时间是否会更长? 答案是并不会有太大的影响。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    resultCh := make(chan chan string, 5000)
    wg := sync.WaitGroup{}
    go replay(resultCh)
    startTime := time.Now()
    operation2(resultCh, "aaa", &wg)
    operation2(resultCh, "bbb", &wg)
    operation1(resultCh, "ccc", &wg)
    operation1(resultCh, "ddd", &wg)
    operation2(resultCh, "eee", &wg)
    wg.Wait()
    endTime := time.Now()
    fmt.Printf("Process time %s", endTime.Sub(startTime))
}

func replay(resultCh chan chan string)(){
    for{
        //拿到一个chan 读取值 这个时候拿到的是先进先出 因为所有方法是按顺序加入chan的
        c := <- resultCh
        //读取嵌套chan中的值,这个时候等待3秒 因为是operation2中执行了3秒 在这3绵中 其实其余的4个方法也已经执行完毕。之后的方法则不需要等待sleep的时间
        r := <-c
        fmt.Println(r)
    }
}

func operation1(ch chan chan string, str string, wg *sync.WaitGroup)(){
    //先创建一个chan 兵给到嵌套chan 占据一个通道 这个通道是阻塞的
    c := make(chan string)
    ch <- c
    wg.Add(1)
    go func(str string){
        time.Sleep(time.Second*1)
        c <- "operation1:"+str
        wg.Done()
    }(str)
}

func operation2(ch chan chan string, str string, wg *sync.WaitGroup)(){
    c := make(chan string)
    ch <- c
    wg.Add(1)
    go func(str string){
        time.Sleep(time.Second*2)
        c <- "operation2:"+str
        wg.Done()
    }(str)
}

结果:
运行的结果还是2秒,但是结果却不同,完全是按照我们调用的顺序返回的。严格按照先进先出的规则。这样整体运行的时间其实取决于执行函数中耗时最长的那个函数。如果第一个函数耗时5秒 其余4个耗时1秒。那么整个main函数耗时也就是5秒

operation2:aaa
operation2:bbb
operation1:ccc
operation1:ddd
operation2:eee
Process time 2.002714923s

总结

其实解决此类问题的方法不止一个,比如在请求喝返回中添加标识等等。
但我认为这个方法巧妙的运用了chan中嵌套chan和go语言 chan阻塞的特性来实现这个功能代码简洁。性能也兵没有消耗太多,总体执行时间也并没有加长。也是一个参考的方法,这种嵌套实现也可以用到其他需要的特殊需求中。

    原文作者:大二小的宝
    原文地址: https://segmentfault.com/a/1190000018475209
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞