Pool是对象池模式的并发安全实现。关于对象池模式的完整解释最好留给有关设计模式的文献(如 Head First Design Patterns)。不过既然Pool出现在了标准库中,就让我们简要讨论下为什么你可能有兴趣使用它。

在较高的层次上,池模式是一种创建和提供固定数量可用对象的方式。它通常用于约束创建资源昂贵的事物(例如数据库连接)。Go的sync.Pool可以被多个例程安全地使用。

Pool的主要接口是它的Get方法。 被调用时,Get将首先检查池中是否有可用实例返回给调用者,如果没有,则创建一个新成员变量。使用完成后,调用者调用Put将正在使用的实例放回池中供其他进程使用。 这里有一个简单的例子来演示:

myPool := &sync.Pool{
    New: func() interface{} {
        fmt.Println("Creating new instance.")
        return struct{}{}
    },
}

myPool.Get()             //1
instance := myPool.Get() //1
myPool.Put(instance)     //2
myPool.Get()             //3
  1. 这里我们调用Get方法,将调用在池中定义的New函数,因为实例尚未实例化。
  2. 在这里,我们将先前检索到的实例放回池中。 这时实例的可用数量为1个。
  3. 执行此调用时,我们将重新使用先前分配的实例。New函数不会被调用。

我们可以看到,这回调用2次New函数:

Creating new instance.
Creating new instance.

那么为什么要使用一个池,而不是实例化对象呢? Go有一个垃圾收集器,所以实例化的对象将被自动清理。 重点是什么? 考虑这个例子:

var numCalcsCreated int
calcPool := &sync.Pool{
    New: func() interface{} {
        numCalcsCreated += 1
        mem := make([]byte, 1024)
        return &mem // 1
    },
}

// 将池扩充到4KB
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
const numWorkers = 1024 * 1024
var wg sync.WaitGroup
wg.Add(numWorkers)

for i := numWorkers; i > 0; i-- {
    go func() {
        defer wg.Done()

        mem := calcPool.Get().(*[]byte) // 2
        defer calcPool.Put(mem)

    }()
}

// 假设内存中执行了一些快速的操作

wg.Wait()
fmt.Printf("%d calculators were created.", numCalcsCreated)
  1. 注意,我们存储了字节切片的指针。
  2. 这里我们断言此类型是一个指向字节切片的指针。

这会输出:

8 calculators were created.

如果我没有使用sync.Pool运行此示例,那么结果将是非确定性的,但在最坏的情况下,我可能需要分配千兆字节的内存。但正如你从输出中看到的那样,我只分配了4 KB 。

Pool有用的另一种常见情况是预热分配对象的缓存,用于必须尽快运行的操作。 在这种情况下,我们不是通过限制创建对象的数量来保护主机的内存,而是通过预先加载获取对另一个对象的引用来减少消费者的时间消耗。在编写高吞吐量网络服务器时,这是非常常见的。我们来看看这种情况。

首先,我们来创建一个模拟创建服务连接的函数。 我们会让这个连接花费很长时间:

func connectToService() interface{} {
    time.Sleep(1 * time.Second)
    return struct{}{}
}

接下来,让我们看看如果对于每个请求都开启一个新的服务连接,网络服务的性能如何。我们将编写一个网络处理程序,为了简化基准测试,我们一次只允许一个连接:


func startNetworkDaemon() *sync.WaitGroup {
    var wg sync.WaitGroup
    wg.Add(1)

    go func() {
        server, err := net.Listen("tcp", "localhost:8080")
        if err != nil {
            log.Fatalf("cannot listen: %v", err)
        }
        defer server.Close()
        wg.Done()
        for {
            conn, err := server.Accept()
            if err != nil {
                log.Printf("cannot accept connection: %v", err)
                continue
            }

            connectToService()
            fmt.Fprintln(conn, "")
            conn.Close()
        }
    }()

    return &wg
}

现在我们可以着手进行基准测试了:

func init() {
    daemonStarted := startNetworkDaemon()
    daemonStarted.Wait()
}

func BenchmarkNetworkRequest(b *testing.B) {
    for i := 0; i < b.N; i++ {
        conn, err := net.Dial("tcp", "localhost:8080")
        if err != nil {
            b.Fatalf("cannot dial host: %v", err)
        }
        if _, err := ioutil.ReadAll(conn); err != nil {
            b.Fatalf("cannot read: %v", err)
        }
        conn.Close()
    }
}

在命令行执行:

cd src/gos-concurrency-building-blocks/the-sync-package/pool/ && \
go test -benchtime=10s -bench=.

这会输出:

BenchmarkNetworkRequest-8 10 1000385643 ns/op
PASS ok command-line-arguments 11.008s

就性能而言这看起来挺合理。让我们加上sync.Pool再试试:

func warmServiceConnCache() *sync.Pool {
    p := &sync.Pool{
        New: connectToService,
    }
    for i := 0; i < 10; i++ {
        p.Put(p.New())
    }
    return p
}

func startNetworkDaemon() *sync.WaitGroup {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        connPool := warmServiceConnCache()

        server, err := net.Listen("tcp", "localhost:8080")
        if err != nil {
            log.Fatalf("cannot listen: %v", err)
        }
        defer server.Close()
        wg.Done()
        for {
            conn, err := server.Accept()
            if err != nil {
                log.Printf("cannot accept connection: %v", err)
                continue

            }
            svcConn := connPool.Get()
            fmt.Fprintln(conn, "")
            connPool.Put(svcConn)
            conn.Close()
        }
    }()

    return &wg
}

同样执行基准测试,

cd src/gos-concurrency-building-blocks/the-sync-package/pool && \
go test -benchtime=10s -bench=.

我们可以看到:

BenchmarkNetworkRequest-8 5000 2904307 ns/op
PASS ok command-line-arguments 32.647s

整整快了三个数量级!你可以看到我们是如何利用这种模式如何大大缩短响应时间的。

正如这个例子所展现的,池模式非常适合于这种需要并发进程,或者构建这些对象可能会对内存产生负面影响的应用程序。

但是,在确定是否应该使用池时有一点需要注意:如果使用池子里东西在内存上不是大致均匀的,则会花更多时间将从池中检索,这比首先实例化它要耗费更多的资源。例如,你的程序需要随机和可变长度的切片,在这种情况下Pool不会为你提供太多的帮助。

因此,在使用Pool时,请记住以下几点:

  • 实例化sync.Pool时,给它一个新元素,该元素应该是线程安全的。
  • 当你从Get获得一个实例时,不要假设你接收到的对象状态。
  • 当你从池中取得实例时,请务必不要忘记调用Put。否则池的优越性就体现不出来了。这通常用defer来执行延迟操作。
  • 池中的元素必须大致上是均匀的。
最后编辑: kuteng  文档更新时间: 2021-01-02 17:30   作者:kuteng