在诸如守护进程这样的长期进程中,拥有一组长生命周期的goroutines非常普遍。这些goroutines通常被阻塞,等待被某种方式唤醒以继续工作。有时候,这些例程依赖于你没有很好控制的资源。也许一个goroutine会接收到Web服务中希望获取数据的请求,或者它正在监视一个临时文件。 如果程序处理不够健壮,goroutine会很容易陷入一个糟糕的状态。在长期运行的过程中,如果能创建一种机制来确保goroutine的健康状况良好,并在健康状况不佳时重新启动,那么我们的项目想必能活得久一点。 我们将在本节讨论对goroutines异常行为进行修复的话题。

我们将使用心跳来检查正在监测的goroutine的活跃程度。心跳的类型将取决于你想要监控的内容,但是如果你的goroutine可能会产生活锁,请确保心跳包含某种信息,以表明该goroutine不仅没死掉,而且还可以正常执行任务。在本节中,为了简单起见,我们只会考虑goroutines是活的还是死的。

下面这段代码建立一个管理者监视一个goroutine的健康状况,以及它的子例程。如果例程变得不健康,管理者将重新启动子例程。为此,它需要引用一个可以启动goroutine的函数。让我们看看管理程序是什么样子的:

type startGoroutineFn func(done <-chan interface{},
        pulseInterval time.Duration) (heartbeat <-chan interface{}) //1

    newSteward := func(timeout time.Duration, startGoroutine startGoroutineFn) startGoroutineFn { //2

        return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {

            heartbeat := make(chan interface{})

            go func() {
                defer close(heartbeat)

                var wardDone chan interface{}
                var wardHeartbeat <-chan interface{}
                startWard := func() { //3

                    wardDone = make(chan interface{}) //4
                    wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) //5
                }
                startWard()
                pulse := time.Tick(pulseInterval)

            monitorLoop:

                for { //6
                    timeoutSignal := time.After(timeout)

                    for {

                        select {
                        case <-pulse:

                            select {
                            case heartbeat <- struct{}{}:

                            default:

                            }
                        case <-wardHeartbeat: //7
                            continue monitorLoop

                        case <-timeoutSignal: //8

                            log.Println("steward: ward unhealthy; restarting")
                            close(wardDone)
                            startWard()
                            continue monitorLoop
                        case <-done:

                            return
                        }

                    }

                }
            }()

            return heartbeat

        }

    }
  1. 这里我们定义一个可以监控和重新启动的goroutine的函数签名。 我们看到熟悉的done通道,以及熟悉的心跳模式写法。
  2. 在这里我们设置了超时时间,并使用函数startGoroutine来启动它正在监控的goroutine。有趣的是,监控器本身返回一个startGoroutineFn,表示监控器自身也是可监控的。
  3. 在这里我们定义一个闭包,它以同样的的方式来启动我们正在监视的goroutine。
  4. 这是我们创建一个新通道,我们会将其传递给监控通道,以响应发出的停止信号。
  5. 在这里,我们开启对目标goroutine的监控。如果监控器停止工作,或者监控器想要停止被监控区域,我们希望监控者也停止,因此我们将两个done通道都包含在逻辑中。我们传入的心跳间隔是超时时间的一半,但正如我们在“心跳”中讨论的那样,这可以调整。
  6. 这是我们的内部循环,它确保监控者可以发出自己的心跳。
  7. 在这里我们如果接收到监控者的心跳,就会知道它还处于正常工作状态,程序会继续监测循环。
  8. 这里如果我们发现监控者超时,我们要求监控者停下来,并开始一个新的goroutine。然后开始新的监测。

我们的for循环有点杂乱,但如果你阅读过前面的章节,熟悉其中的模式,那么理解起来会相对简单。 接下来让我们试试看如果监控一个行为异常的goroutine,会发生什么:

log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)

doWork := func(done <-chan interface{}, _ time.Duration) <-chan interface{} {

    log.Println("ward: Hello, I'm irresponsible!")

    go func() {
        <-done // 1
        log.Println("ward: I am halting.")
    }()
    return nil
}

doWorkWithSteward := newSteward(4*time.Second, doWork) // 2

done := make(chan interface{})
time.AfterFunc(9*time.Second, func() { // 3
    log.Println("main: halting steward and ward.")
    close(done)
})

for range doWorkWithSteward(done, 4*time.Second) { // 4
}

log.Println("Done")
  1. 可以看到这个goroutine什么都没干,持续阻塞等待被取消,它同样不会发出任何表明自己正常信号。
  2. 这里开始建立被监控的例程,其4秒后会超时。
  3. 这里我们9秒后向done通道发出信号停止整个程序。
  4. 最后,我们启动监控器并在其心跳范围内防止示例停止。

这会输出:

18:28:07 ward: Hello, I'm irresponsible!
18:28:11 steward: ward unhealthy; restarting 18:28:11 ward: Hello, I'm irresponsible!
18:28:11 ward: I am halting.
18:28:15 steward: ward unhealthy; restarting 18:28:15 ward: Hello, I'm irresponsible!
18:28:15 ward: I am halting.
18:28:16 main: halting steward and ward.
18:28:16 ward: I am halting.
18:28:16 Done

看起来工作正常。我们的监控器比较简单,除了取消操作和心跳所需信息之外不接收也不返回任何参数。我们可以用闭包强化一下:

doWorkFn := func(done <-chan interface{}, intList ...int) (startGoroutineFn, <-chan interface{}) {//1

    intChanStream := make(chan (<-chan interface{}))//2
    intStream := bridge(done, intChanStream)

    doWork := func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {//3

        intStream := make(chan interface{})//4
        heartbeat := make(chan interface{})

        go func() {
            defer close(intStream)
            select {
            case intChanStream <- intStream://5
            case <-done:
                return
            }

            pulse := time.Tick(pulseInterval)

            for {
            valueLoop:
                for _, intVal := range intList {
                    if intVal < 0 {
                        log.Printf("negative value: %v\n", intVal)//6
                        return
                    }

                    for {
                        select {
                        case <-pulse:
                            select {
                            case heartbeat <- struct{}{}: default:
                            }
                        case intStream <- intVal:
                            continue valueLoop
                        case <-done:
                            return
                        }
                    }
                }
            }
        }()
        return heartbeat
    }
    return doWork, intStream
}
  1. 我们将监控器关闭的内容放入返回值,并返回所有监控器用来交流数据的通道。
  2. 我们建立通道的通道,这是我们在前面章节中”bridge”模式的应用。
  3. 这里我们建立闭包控制监控器的启动和关闭。
  4. 这是各通道与监控器交互数据的实例。
  5. 这里我们向起数据交互作用的通道传入数据。
  6. 这里我们返回负数并从goroutine返回以模拟不正常的工作状态。

由于我们可能会启动监控器的多个副本,因此我们使用”bridge”模式来帮助向doWorkFn的调用者呈现单个不间断的通道。通过这样的方式,我们的监控器可以简单地通过组成模式而变得任意复杂。让我们看看如何调用:

log.SetFlags(log.Ltime | log.LUTC)
log.SetOutput(os.Stdout)

done := make(chan interface{})
defer close(done)

doWork, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5) //1
doWorkWithSteward := newSteward(1*time.Millisecond, doWork) //2
doWorkWithSteward(done, 1*time.Hour) //3

for intVal := range take(done, intStream, 6) { //4
    fmt.Printf("Received: %v\n", intVal)
}
  1. 这里我们调用该函数,它会将传入的不定长整数参数转换为可通信的流。
  2. 在这里,我们创建了一个检查doWork关闭的监视器。我们预计这里会极快的进入失败流程,所以将监控时间设置为一毫秒。
  3. 我们通知 steward 开启监测。
  4. 最后,我们使用该管道,并从intStream中取出前六个值。

这会输出:

Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting Received: 1
23:25:33 negative value: -1
Received: 2

我们可以看到监控器发现错误并重启。你可能还会注意到我们只接收到了1和2,这证明了重启功能正常。如果你的系统对重复值很敏感,一定要考虑对其进行处理。你也可以考虑在一定次数的失败后退出。比如在这样的位置:

valueLoop:
for _, intVal := range intList {
// ...
}

稍作修改:

valueLoop:
for {
    intVal := intList[0] 
    intList = intList[1:]
    // ...
}

尽管我们依然停留在返回的无效负数上,尽管我们的监控器将继续失败,但这会记录在重新启动前的位置,你可以在这个思路上扩展。

使用这样的方式可以确保你的系统保持健康,此外,相信系统崩溃的减少也能大幅度降低开发过程中猝死的几率。

最后编辑: kuteng  文档更新时间: 2021-01-02 17:30   作者:kuteng