在某些情况下,你可能会发现自己想要使用一系列通道的值:
<-chan <-chan interface{}
这与将某个通道的数据切片合并到一个通道中稍有不同,这种调用方式意味着一系列通道有序的写入操作。这与管道的单个“阶段”类似,其生命周期是间歇性的。按“访问范围约束”章节所提到的,通道由写入它们的goroutine所拥有,每当在新的goroutine中启动一个管道的“阶段”时,就会创建一个新的通道——这意味着我们会得到一个通道队列。我们会在第五章“Goroutines异常行为修复”中详细讨论。
作为消费者,代码可能不关心其值来自于一系列通道的事实。在这种情况下,处理一系列通道中的单个通道可能很麻烦。如果我们定义一个函数,可以将一系列通道拆解为一个简单的通道——我们成为通道桥接(bridge-channle),这使得消费者更容易关注手头的问题:
bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{}) // 1
go func() {
defer close(valStream)
for { // 2
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if ok == false {
return
}
stream = maybeStream
case <-done:
return
}
for val := range orDone(done, stream) { // 3
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}
- 这个通道会返回所有传入bridge的通道。
- 该循环负责从chanStream中提取通道并将其提供给嵌套循环以供使用。
- 该循环负责读取已经给出的通道的值,并将这些值重复到valStream中。当前正在循环的流关闭时,我们跳出执行从该通道读取的循环,并继续下一次循环来选择要读取的通道。 这为我们提供了一个不间断的流。
这段代码非常直白。接下来我们来使用它。下面这个例子创建了10个通道,每个通道都写入一个元素,并将这些通道传递给bridge:
genVals := func() <-chan <-chan interface{} {
chanStream := make(chan (<-chan interface{}))
go func() {
defer close(chanStream)
for i := 0; i < 10; i++ {
stream := make(chan interface{}, 1)
stream <- i
close(stream)
chanStream <- stream
}
}()
return chanStream
}
for v := range bridge(nil, genVals()) {
fmt.Printf("%v ", v)
}
这会输出:
0 1 2 3 4 5 6 7 8 9
通过使用bridge,我们可以专注于解构之外的逻辑,而无需去关心大量的通道处理问题。
最后编辑: kuteng 文档更新时间: 2021-01-02 17:30 作者:kuteng