我早些时候承诺会演示一些可能广泛使用的有趣的生成器。我们来看看一个名为repeat的生成器:

repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {

    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            for _, v := range values {
                select {
                case <-done:
                    return
                case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

这个函数会重复你传给它的值,直到你告诉它停止。 让我们来看看另一个函数take,它在与repeat结合使用时很有用:

take := func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {

    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

这个函数会从其传入的valueStream中取出第一个元素然后退出。二者组合起来会怎么样呢?

done := make(chan interface{})
defer close(done)

for num := range take(done, repeat(done, 1), 10) {
    fmt.Printf("%v ", num)
}

这会输出:

1 1 1 1 1 1 1 1 1 1

在这个基本的例子中,我们创建了一个repeat生成器来生成无限数量的重复生成器,但是只取前10个。repeat生成器由take接收。虽然我们可以生成无线数量的流,但只会生成n+1个实例,其中n是我们传入take的数量。

我们可以扩展这一点。让我们创建另一个生成器,但是这次我们创建一个重复调用函数的生成器repeatFn:

repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {

    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

我们用它来生成10个随机数:

done := make(chan interface{})
defer close(done)

rand := func() interface{} {
    return rand.Int()
}

for num := range take(done, repeatFn(done, rand), 10) {
    fmt.Println(num)
}

这会输出:

5577006791947779410
8674665223082153551
6129484611666145821
4037200794235010051
3916589616287113937
6334824724549167320
605394647632969758
1443635317331776148
894385949183117216
2775422040480279449

您可能想知道为什么所有这些发生器通道类型都是interface{}。

Go中的空接口有点争议,但我认为处理interface的通道方便使用标准的管道模式。 正如我们前面所讨论的,管道的强大来自可重用的阶段。当阶段以适合自身的特异性水平进行操作时,这是最好的。在repeat和repeatFn生成器中,我们需要关注的是通过在列表或运算符上循环来生成数据流。这些操作都不需要关于处理的类型,而只需要知道参数的类型。

当需要处理特定的类型时,可以放置一个执行类型断言的阶段。有一个额外的管道阶段和类型断言的性能开销可以忽略不计,正如我们稍后会看到的。 以下是一个介绍toString管道阶段的小例子:

toString := func(done <-chan interface{}, valueStream <-chan interface{}, ) <-chan string {
    stringStream := make(chan string)
    go func() {
        defer close(stringStream)
        for v := range valueStream {
            select {
            case <-done:
                return
            case stringStream <- v.(string):
            }
        }
    }()
    return stringStream
}

可以这样使用它:

done := make(chan interface{})
defer close(done)

var message string
for token := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
    message += token
}

fmt.Printf("message: %s...", message)

这会输出:

message: Iam.Iam.I...

现在让我们证明刚才提到的性能问题。我们将编写两个基准测试函数:一个测试通用阶段,一个测试类型特定阶段:

func BenchmarkGeneric(b *testing.B) {
    done := make(chan interface{})
    defer close(done)

    b.ResetTimer()
    for range toString(done, take(done, repeat(done, "a"), b.N)) {
    }
}

func BenchmarkTyped(b *testing.B) {

    repeat := func(done <-chan interface{}, values ...string) <-chan string {

        valueStream := make(chan string)
        go func() {

            defer close(valueStream)
            for {
                for _, v := range values {
                    select {
                    case <-done:
                        return
                    case valueStream <- v:
                    }
                }
            }
        }()
        return valueStream
    }

    take := func(done <-chan interface{}, valueStream <-chan string, num int, ) <-chan string {
        takeStream := make(chan string)
        go func() {
            defer close(takeStream)
            for i := num; i > 0 || i == -1; {
                if i != -1 {
                    i--
                }
                select {
                case <-done:
                    return
                case takeStream <- <-valueStream:
                }
            }
        }()

        return takeStream
    }

    done := make(chan interface{})
    defer close(done)

    b.ResetTimer()
    for range take(done, repeat(done, "a"), b.N) {
    }
}

这会输出:

BenchmarkGeneric-4 1000000 2266 ns/op
BenchmarkTyped-4 1000000 1181 ns/op
1181 ns/op command-line-arguments 3.486s

可以看到,特定类型的速度是接口类型的2倍。一般来说,管道上的限制因素将是生成器,或者是密集计算的某个阶段。如果生成器不像repeat和repeatFn生成器那样从内存中创建流,则可能会受I/O限制。从磁盘或网络读取数据可能会超出此处显示的性能开销。

那么,如果真是在计算上存在性能瓶颈,我们该怎么办?基于这种情况,让我们来讨论扇出扇入技术。

最后编辑: kuteng  文档更新时间: 2021-01-02 17:30   作者:kuteng