我有几个频道c1,c2,c3,c4 ……,如何将这些频道的所有数据收集到一个频道?
我的代码:
package main
import (
"fmt"
"sync"
)
func putToChannel(c chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i < 6; i++ {
c <- i
}
}
func main() {
c := make(chan int, 15)
c1 := make(chan int, 5)
c2 := make(chan int, 5)
c3 := make(chan int, 5)
go func(){c <- <-c1}()
go func(){c <- <-c2}()
go func(){c <- <-c3}()
wg := new(sync.WaitGroup)
wg.Add(3)
go putToChannel(c1, wg)
go putToChannel(c2, wg)
go putToChannel(c3, wg)
wg.Wait()
close(c)
for i := range c {
fmt.Println("Receive:", i)
}
fmt.Println("Finish")
}
我想从c1,c2 …到c组成所有数据,但它不起作用
最佳答案 这篇文章很好地描述了如何对频道进行“扇入”,包括停止短片.
Link
这些线路有问题:
go func(){c <- <-c1}()
go func(){c <- <-c2}()
go func(){c <- <-c3}()
其中每个都将从cx通道接收一个值,并将该值发送到c.
你需要一个看起来像这样的方法;
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
此方法依赖于以下事实:当没有更多值要发送时,正在传递给合并的通道cs …将被关闭.
这意味着您还需要更新putToChannel方法
func putToChannel(c chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
defer close(c)
for i := 1; i < 6; i++ {
c <- i
}
}
值得注意的最后一件事是,总的来说;尝试并封装创建的功能&发送到通道和关闭通道的功能是相同的功能.这意味着您永远不会尝试发送封闭的频道.
代替:
c1 := make(chan int, 5)
go putToChannel(c1, wg)
你可以做;
func generator() (<-chan int) {
c := make(chan int, 5)
go func() {
for i := 1; i < 6; i++ {
c <- i
}
close(c)
}()
return c
}
您的主要方法将类似于:
func main() {
var cs []<-chan int
cs = append(cs, generator())
cs = append(cs, generator())
cs = append(cs, generator())
c := merge(cs...)
for v := range c {
fmt.Println(v)
}
}