我早些时候承诺会演示一些可能广泛使用的有趣的生成器。我们来看看一个名为repeat的生成器:
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
这个函数会重复你传给它的值,直到你告诉它停止。 让我们来看看另一个函数take,它在与repeat结合使用时很有用:
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
这个函数会从其传入的valueStream中取出第一个元素然后退出。二者组合起来会怎么样呢?
done := make(chan interface{})
defer close(done)
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}
这会输出:
1 1 1 1 1 1 1 1 1 1
在这个基本的例子中,我们创建了一个repeat生成器来生成无限数量的重复生成器,但是只取前10个。repeat生成器由take接收。虽然我们可以生成无线数量的流,但只会生成n+1个实例,其中n是我们传入take的数量。
我们可以扩展这一点。让我们创建另一个生成器,但是这次我们创建一个重复调用函数的生成器repeatFn:
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
我们用它来生成10个随机数:
done := make(chan interface{})
defer close(done)
rand := func() interface{} {
return rand.Int()
}
for num := range take(done, repeatFn(done, rand), 10) {
fmt.Println(num)
}
这会输出:
5577006791947779410
8674665223082153551
6129484611666145821
4037200794235010051
3916589616287113937
6334824724549167320
605394647632969758
1443635317331776148
894385949183117216
2775422040480279449
您可能想知道为什么所有这些发生器通道类型都是interface{}。
Go中的空接口有点争议,但我认为处理interface的通道方便使用标准的管道模式。 正如我们前面所讨论的,管道的强大来自可重用的阶段。当阶段以适合自身的特异性水平进行操作时,这是最好的。在repeat和repeatFn生成器中,我们需要关注的是通过在列表或运算符上循环来生成数据流。这些操作都不需要关于处理的类型,而只需要知道参数的类型。
当需要处理特定的类型时,可以放置一个执行类型断言的阶段。有一个额外的管道阶段和类型断言的性能开销可以忽略不计,正如我们稍后会看到的。 以下是一个介绍toString管道阶段的小例子:
toString := func(done <-chan interface{}, valueStream <-chan interface{}, ) <-chan string {
stringStream := make(chan string)
go func() {
defer close(stringStream)
for v := range valueStream {
select {
case <-done:
return
case stringStream <- v.(string):
}
}
}()
return stringStream
}
可以这样使用它:
done := make(chan interface{})
defer close(done)
var message string
for token := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
message += token
}
fmt.Printf("message: %s...", message)
这会输出:
message: Iam.Iam.I...
现在让我们证明刚才提到的性能问题。我们将编写两个基准测试函数:一个测试通用阶段,一个测试类型特定阶段:
func BenchmarkGeneric(b *testing.B) {
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for range toString(done, take(done, repeat(done, "a"), b.N)) {
}
}
func BenchmarkTyped(b *testing.B) {
repeat := func(done <-chan interface{}, values ...string) <-chan string {
valueStream := make(chan string)
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan string, num int, ) <-chan string {
takeStream := make(chan string)
go func() {
defer close(takeStream)
for i := num; i > 0 || i == -1; {
if i != -1 {
i--
}
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for range take(done, repeat(done, "a"), b.N) {
}
}
这会输出:
BenchmarkGeneric-4 | 1000000 | 2266 ns/op |
---|---|---|
BenchmarkTyped-4 | 1000000 | 1181 ns/op |
1181 ns/op | command-line-arguments | 3.486s |
可以看到,特定类型的速度是接口类型的2倍。一般来说,管道上的限制因素将是生成器,或者是密集计算的某个阶段。如果生成器不像repeat和repeatFn生成器那样从内存中创建流,则可能会受I/O限制。从磁盘或网络读取数据可能会超出此处显示的性能开销。
那么,如果真是在计算上存在性能瓶颈,我们该怎么办?基于这种情况,让我们来讨论扇出扇入技术。