在某些情况下,你可能会发现自己想要使用一系列通道的值:

<-chan <-chan interface{}

这与将某个通道的数据切片合并到一个通道中稍有不同,这种调用方式意味着一系列通道有序的写入操作。这与管道的单个“阶段”类似,其生命周期是间歇性的。按“访问范围约束”章节所提到的,通道由写入它们的goroutine所拥有,每当在新的goroutine中启动一个管道的“阶段”时,就会创建一个新的通道——这意味着我们会得到一个通道队列。我们会在第五章“Goroutines异常行为修复”中详细讨论。

作为消费者,代码可能不关心其值来自于一系列通道的事实。在这种情况下,处理一系列通道中的单个通道可能很麻烦。如果我们定义一个函数,可以将一系列通道拆解为一个简单的通道——我们成为通道桥接(bridge-channle),这使得消费者更容易关注手头的问题:

bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {

    valStream := make(chan interface{}) // 1
    go func() {
        defer close(valStream)
        for { // 2
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if ok == false {
                    return
                }
                stream = maybeStream
            case <-done:
                return
            }
            for val := range orDone(done, stream) { // 3
                select {
                case valStream <- val:
                case <-done:
                }
            }
        }
    }()
    return valStream
}
  1. 这个通道会返回所有传入bridge的通道。
  2. 该循环负责从chanStream中提取通道并将其提供给嵌套循环以供使用。
  3. 该循环负责读取已经给出的通道的值,并将这些值重复到valStream中。当前正在循环的流关闭时,我们跳出执行从该通道读取的循环,并继续下一次循环来选择要读取的通道。 这为我们提供了一个不间断的流。

这段代码非常直白。接下来我们来使用它。下面这个例子创建了10个通道,每个通道都写入一个元素,并将这些通道传递给bridge:

genVals := func() <-chan <-chan interface{} {

    chanStream := make(chan (<-chan interface{}))

    go func() {
        defer close(chanStream)
        for i := 0; i < 10; i++ {
            stream := make(chan interface{}, 1)
            stream <- i
            close(stream)
            chanStream <- stream
        }
    }()
    return chanStream
}

for v := range bridge(nil, genVals()) {
    fmt.Printf("%v ", v)
}

这会输出:

0 1 2 3 4 5 6 7 8 9

通过使用bridge,我们可以专注于解构之外的逻辑,而无需去关心大量的通道处理问题。

最后编辑: kuteng  文档更新时间: 2021-01-02 17:30   作者:kuteng