正如我们在“Goroutines”一节中介绍的那样,goroutines占用资源较少且易于创建。运行时将多个goroutine复用到任意数量的操作系统线程,以便我们不必担心抽象级别。但是他们会花费成本资源,并且goroutine不会被运行时垃圾收集,所以无论内存占用多少,我们都不想让他们对我们的进程撒谎。 那么我们如何去确保他们被清理干净?

让我们从头开始,一步一步思考:为什么会有一个goroutine? 在第二章中,我们确定,goroutines代表可能并行或不可以并行运行的工作单元。 该goroutine有几条路径终止:

  • 当它完成任务。
  • 当它遇到不可恢复的错误无法继续它的任务。
  • 当它被告知停止当前任务。

前两条我们已经知晓,可以通过算法实现。但如何取消当前任务?由于网络效应,这最重要的一点是:如果你已经开始了一个goroutine,那么它很可能以某种有组织的方式与其他几个goroutines合作。我们甚至可以把这种相互连接表现为一张图表,这时该goroutine能否停下来还取决于处在交互的其他goroutines。我们将在下一章中继续关注大规模并发产生的相互依赖关系,但现在让我们考虑如何确保保证单个goroutine得到清理。 让我们从一个简单的goroutine泄漏开始:

doWork := func(strings <-chan string) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)
        for s := range strings {
            fmt.Println(s)
        }
    }()
    return completed
}

doWork(nil)
// 这里还有其他任务执行
fmt.Println("Done.")

我们看到doWork被传递了一个nil通道。所以strings通道永远无法读取到其承载的内容,而且包含doWork的goroutine将在这个过程的整个生命周期中保留在内存中(如果我们在doWork和主goutoutine中加入了goroutine,我们甚至会死锁)。

在这个例子中,整个进程的生命周期很短,但是在一个真正的程序中,goroutines可以很容易地在一个长期生命的程序开始时启动,导致内存利用率下降。

解决这种情况的方法是建立一个信号,按照惯例,这个信号通常是一个名为done的只读通道。父例程将该通道传递给子例程,然后在想要取消子例程时关闭该通道。 这是一个例子:

doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} { //1
    terminated := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(terminated)
        for {
            select {
            case s := <-strings:
                // Do something interesting
                fmt.Println(s)
            case <-done: //2
                return
            }
        }
    }()
    return terminated
}

done := make(chan interface{})
terminated := doWork(done, nil)

go func() { //3
    // Cancel the operation after 1 second.
    time.Sleep(1 * time.Second)
    fmt.Println("Canceling doWork goroutine...")
    close(done)
}()

<-terminated //4
fmt.Println("Done.")
  1. 这里我们传递done通道给doWork函数。作为惯例,这个通道被作为首个参数。
  2. 这里我们看到使用了for-select的使用模式之一。我们的目的是检查done通道 有没有发出信号。如果有的话,我们退出当前goroutine。
  3. 在这里我们创建另一个goroutine,一秒后就会取消doWork中产生的goroutine。
  4. 这是我们在main goroutine中调用doWork函数返回结果的地方。

这会输出:

Canceling doWork goroutine... 
doWork exited.
Done.

你可以看到尽管向doWork传递了nil给strings通道,我们的goroutine依然正常运行至结束。与之前的例子不同,本例中我们把两个goroutine连接在一起之前,我们建立了第三个goroutine以取消doWork中的goroutine,并成功消除了泄漏问题。

前面的例子很好地处理了在通道上接收goroutine的情况,但是如果我们正在处理相反的情况:在尝试向通道写入值时阻塞goroutine会怎样?

newRandStream := func() <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.") // 1
        defer close(randStream)
        for {
            randStream <- rand.Int()
        }
    }()

    return randStream
}

randStream := newRandStream()
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}
  1. 当goroutine成功执行时我们打印一行消息。

这会输出:

3 random ints:
1: 5577006791947779410
2: 8674665223082153551
3: 6129484611666145821

你可以看到注释1所在的打印语句并未执行。在循环的第三次迭代之后,我们的goroutine块试图将下一个随机整数发送到不再被读取的通道。我们无法告知它停下来,解决方案是为生产者提供一条通知它退出的通道:

newRandStream := func(done <-chan interface{}) <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.")
        defer close(randStream)

        for {
            select {
            case randStream <- rand.Int():
            case <-done:
                return
            }
        }

    }()

    return randStream
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}

close(done)
//模拟正在进行的工作
time.Sleep(1 * time.Second)

这会输出:

3 random ints:
1: 5577006791947779410
2: 8674665223082153551
3: 6129484611666145821
newRandStream closure exited.

我们现在看到该goroutine被妥善清理。
现在我们知道如何确保goroutine不泄漏,我们可以制定一个约定:如果goroutine负责创建goroutine,它也负责确保它可以停止goroutine。

这个约定有助于确保程序在组合和扩展时可用。我们将在“管道”和“context包”中重新讨论这种技术和规则。我们该如何确保goroutine能够被停止根据goroutine的类型和用途而有所不同,但是它们 所有这些都是建立在传递done通道基础上的。

go

最后编辑: kuteng  文档更新时间: 2021-01-02 17:30   作者:kuteng