前面的例子显示当一个任务时没有必要时使用 goroutine。但使用 Go 语言的原因之一是该语言提供的并发功能。实际上,很多情况下你希望利用硬件中可用的并行性。为此,你必须使用 goroutines

这个简单的应用程序在两个不同的端口上提供 http 服务,端口 8080 用于应用程序服务,端口 8001 用于访问 /debug/pprof 终端。

package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
)

func main() {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    go http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux) // debug
    http.ListenAndServe("0.0.0.0:8080", mux)                       // app traffic
}

虽然这个程序不是很复杂,但它代表了真实应用程序的基础。

该应用程序存在一些问题,因为它随着应用程序的增长而显露出来,所以我们现在来解决其中的一些问题。

func serveApp() {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    http.ListenAndServe("0.0.0.0:8080", mux)
}

func serveDebug() {
    http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
}

func main() {
    go serveDebug()
    serveApp()
}

通过将 serveAppserveDebug 处理程序分解成为它们自己的函数,我们将它们与 main.main 分离。 也遵循了上面的建议,并确保 serveAppserveDebug 将它们的并发性留给调用者。

但是这个程序存在一些可操作性问题。 如果 serveApp 返回,那么 main.main 将返回,导致程序关闭并由你使用的进程管理器来重新启动。

贴士:
正如 Go 语言中的函数将并发性留给调用者一样,应用程序应该将监视其状态和检测是否重启的工作留给另外的程序来做。 不要让你的应用程序负责重新启动自己,最好从应用程序外部处理该过程。

然而,serveDebug 是在一个单独的 goroutine 中运行的,返回后该 goroutine 将退出,而程序的其余部分继续。 由于 /debug 处理程序已停止工作很久,因此操作人员不会很高兴发现他们无法在你的应用程序中获取统计信息。

我们想要确保的是,如果任何负责提供此应用程序的 goroutine 停止,我们将关闭该应用程序。

func serveApp() {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    if err := http.ListenAndServe("0.0.0.0:8080", mux); err != nil {
        log.Fatal(err)
    }
}

func serveDebug() {
    if err := http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux); err != nil {
        log.Fatal(err)
    }
}

func main() {
    go serveDebug()
    go serveApp()
    select {}
}

现在 serverAppserveDebug 检查从 ListenAndServe 返回的错误,并在需要时调用 log.Fatal。因为两个处理程序都在 goroutine 中运行,所以我们将 main goroutine 停在 select{} 中。

这种方法存在许多问题:

  1. 如果 ListenAndServer 返回 nil 错误,则不会调用 log.Fatal,并且该端口上的 HTTP 服务将在不停止应用程序的情况下关闭。
  2. log.Fatal 调用 os.Exit,它将无条件地退出程序; defer 不会被调用,其他 goroutines 也不会被通知关闭,程序就停止了。 这使得编写这些函数的测试变得困难。

贴士:
只在 main.maininit 函数中的使用 log.Fatal

我们真正想要的是任何错误发送回 goroutine 的调用者,以便它可以知道 goroutine 停止的原因,可以干净地关闭程序进程。

func serveApp() error {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    return http.ListenAndServe("0.0.0.0:8080", mux)
}

func serveDebug() error {
    return http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
}

func main() {
    done := make(chan error, 2)
    go func() {
        done <- serveDebug()
    }()
    go func() {
        done <- serveApp()
    }()

    for i := 0; i < cap(done); i++ {
        if err := <-done; err != nil {
            fmt.Println("error: %v", err)
        }
    }
}

我们可以使用通道来收集 goroutine 的返回状态。通道的大小等于我们想要管理的 goroutine 的数量,这样发送到 done 通道就不会阻塞,因为这会阻止 goroutine 的关闭,导致它泄漏。

由于没有办法安全地关闭 done 通道,我们不能使用 for range 来循环通道直到获取所有 goroutine 发来的报告,而是循环我们开启的多个 goroutine,即通道的容量。

现在我们有办法等待每个 goroutine 干净地退出并记录他们遇到的错误。所需要的只是一种从第一个 goroutine 转发关闭信号到其他 goroutine 的方法。

事实证明,要求 http.Server 关闭是有点牵扯的,所以我将这个逻辑转给辅助函数。serve 助手使用一个地址和 http.Handler,类似于 http.ListenAndServe,还有一个 stop 通道,我们用它来触发 Shutdown 方法。

func serve(addr string, handler http.Handler, stop <-chan struct{}) error {
    s := http.Server{
        Addr:    addr,
        Handler: handler,
    }

    go func() {
        <-stop // wait for stop signal
        s.Shutdown(context.Background())
    }()

    return s.ListenAndServe()
}

func serveApp(stop <-chan struct{}) error {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    return serve("0.0.0.0:8080", mux, stop)
}

func serveDebug(stop <-chan struct{}) error {
    return serve("127.0.0.1:8001", http.DefaultServeMux, stop)
}

func main() {
    done := make(chan error, 2)
    stop := make(chan struct{})
    go func() {
        done <- serveDebug(stop)
    }()
    go func() {
        done <- serveApp(stop)
    }()

    var stopped bool
    for i := 0; i < cap(done); i++ {
        if err := <-done; err != nil {
            fmt.Println("error: %v", err)
        }
        if !stopped {
            stopped = true
            close(stop)
        }
    }
}

现在,每次我们在 done 通道上收到一个值时,我们关闭 stop 通道,这会导致在该通道上等待的所有 goroutine 关闭其 http.Server。 这反过来将导致其余所有的 ListenAndServe goroutines 返回。 一旦我们开启的所有 goroutine 都停止了,main.main 就会返回并且进程会干净地停止。

贴士:
自己编写这种逻辑是重复而微妙的。 参考下这个包: https://github.com/heptio/workgroup,它会为你完成大部分工作。

最后编辑: kuteng  文档更新时间: 2021-01-09 21:54   作者:kuteng