通道用例大全一文中介绍了很多通过使用通道来实现并发同步的用例。 事实上,通道并不是Go支持的唯一的一种并发同步技术。而且对于一些特定的情形,通道并不是最有效和可读性最高的同步技术。 本文下面将介绍sync标准库包中提供的各种并发同步技术。相对于通道,这些技术对于某些情形更加适用。

sync标准库包提供了一些用于实现并发同步的类型。这些类型适用于各种不同的内存顺序需求。 对于这些特定的需求,这些类型使用起来比通道效率更高,代码实现更简洁。

(请注意:为了避免各种异常行为,最好不要复制sync标准库包中提供的类型的值。)

sync.WaitGroup(等待组)类型

每个sync.WaitGroup值在内部维护着一个计数,此计数的初始默认值为零。

*sync.WaitGroup类型有三个方法Add(delta int)Done()Wait()

对于一个可寻址的sync.WaitGroupwg

  • 我们可以使用方法调用wg.Add(delta)来改变值wg维护的计数。
  • 方法调用wg.Done()wg.Add(-1)是完全等价的。
  • 如果一个wg.Add(delta)或者wg.Done()调用将wg维护的计数更改成一个负数,一个恐慌将产生。
  • 当一个协程调用了wg.Wait()时,
    • 如果此时wg维护的计数为零,则此wg.Wait()此操作为一个空操作(no-op);
    • 否则(计数为一个正整数),此协程将进入阻塞状态。 当以后其它某个协程将此计数更改至0时(一般通过调用wg.Done()),此协程将重新进入运行状态(即wg.Wait()将返回)。

请注意wg.Add(delta)wg.Done()wg.Wait()分别是(&wg).Add(delta)(&wg).Done()(&wg).Wait()的简写形式。

一般,一个sync.WaitGroup值用来让某个协程等待其它若干协程都先完成它们各自的任务。
一个例子:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    const N = 5
    var values [N]int32

    var wg sync.WaitGroup
    wg.Add(N)
    for i := 0; i < N; i++ {
        i := i
        go func() {
            values[i] = 50 + rand.Int31n(50)
            fmt.Println("Done:", i)
            wg.Done() // <=> wg.Add(-1)
        }()
    }

    wg.Wait()
    // 所有的元素都保证被初始化了。
    fmt.Println("values:", values)
}

在此例中,主协程等待着直到其它5个协程已经将各自负责的元素初始化完毕此会打印出各个元素值。
这里是一个可能的程序执行输出结果:

Done: 4
Done: 1
Done: 3
Done: 0
Done: 2
values: [71 89 50 62 60]

我们可以将上例中的Add方法调用拆分成多次调用:

...
    var wg sync.WaitGroup
    for i := 0; i < N; i++ {
        wg.Add(1) // 将被执行5次
        i := i
        go func() {
            values[i] = 50 + rand.Int31n(50)
            wg.Done()
        }()
    }
...

一个*sync.WaitGroup值的Wait方法可以在多个协程中调用。
当对应的sync.WaitGroup值维护的计数降为0,这些协程都将得到一个(广播)通知而结束阻塞状态。

func main() {
    rand.Seed(time.Now().UnixNano())

    const N = 5
    var values [N]int32

    var wgA, wgB sync.WaitGroup
    wgA.Add(N)
    wgB.Add(1)

    for i := 0; i < N; i++ {
        i := i
        go func() {
            wgB.Wait() // 等待广播通知
            log.Printf("values[%v]=%v \n", i, values[i])
            wgA.Done()
        }()
    }

    // 下面这个循环保证将在上面的任何一个
    // wg.Wait调用结束之前执行。
    for i := 0; i < N; i++ {
        values[i] = 50 + rand.Int31n(50)
    }
    wgB.Done() // 发出一个广播通知
    wgA.Wait()
}

一个WaitGroup可以在它的一个Wait方法返回之后被重用。 但是请注意,当一个WaitGroup值维护的基数为零时,它的带有正整数实参的Add方法调用不能和它的Wait方法调用并发运行,否则将可能出现数据竞争。

sync.Once类型

每个*sync.Once值有一个Do(f func())方法。 此方法只有一个类型为func()的参数。

对一个可寻址的sync.Onceoo.Do()(即(&o).Do()的简写形式)方法调用可以在多个协程中被多次并发地执行, 这些方法调用的实参应该(但并不强制)为同一个函数值。 在这些方法调用中,有且只有一个调用的实参函数(值)将得到调用。 此被调用的实参函数保证在任何o.Do()方法调用返回之前退出。 换句话说,被调用的实参函数内的代码将在任何o.Do()方法返回调用之前被执行。

一般来说,一个sync.Once值被用来确保一段代码在一个并发程序中被执行且仅被执行一次。

一个例子:

package main

import (
    "log"
    "sync"
)

func main() {
    log.SetFlags(0)

    x := 0
    doSomething := func() {
        x++
        log.Println("Hello")
    }

    var wg sync.WaitGroup
    var once sync.Once
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            once.Do(doSomething)
            log.Println("world!")
        }()
    }

    wg.Wait()
    log.Println("x =", x) // x = 1
}

在此例中,Hello将仅被输出一次,而world!将被输出5次,并且Hello肯定在所有的5个world!之前输出。

sync.Mutex(互斥锁)和sync.RWMutex(读写锁)类型

*sync.Mutex*sync.RWMutex类型都实现了sync.Locker接口类型。 所以这两个类型都有两个方法:Lock()Unlock(),用来保护一份数据不会被多个使用者同时读取和修改。

除了Lock()Unlock()这两个方法,*sync.RWMutex类型还有两个另外的方法:RLock()RUnlock(),用来支持多个读取者并发读取一份数据但防止此份数据被某个数据写入者和其它数据访问者(包括读取者和写入者)同时使用。

(注意:这里的数据读取者数据写入者不应该从字面上理解。有时候某些数据读取者可能修改数据,而有些数据写入者可能只读取数据。)

一个Mutex值常称为一个互斥锁。 一个Mutex零值为一个尚未加锁的互斥锁。 一个(可寻址的)Mutexm只有在未加锁状态时才能通过m.Lock()方法调用被成功加锁。 换句话说,一旦m值被加了锁(亦即某个m.Lock()方法调用成功返回), 一个新的加锁试图将导致当前协程进入阻塞状态,直到此Mutex值被解锁为止(通过m.Unlock()方法调用)。

注意:m.Lock()m.Unlock()分别是(&m).Lock()(&m).Unlock()的简写形式。

一个使用sync.Mutex的例子:

package main

import (
    "fmt"
    "runtime"
    "sync"
)

type Counter struct {
    m sync.Mutex
    n uint64
}

func (c *Counter) Value() uint64 {
    c.m.Lock()
    defer c.m.Unlock()
    return c.n
}

func (c *Counter) Increase(delta uint64) {
    c.m.Lock()
    c.n += delta
    c.m.Unlock()
}

func main() {
    var c Counter
    for i := 0; i < 100; i++ {
        go func() {
            for k := 0; k < 100; k++ {
                c.Increase(1)
            }
        }()
    }

    // 此循环仅为演示目的。
    for c.Value() < 10000 {
        runtime.Gosched()
    }
    fmt.Println(c.Value()) // 10000
}

在上面这个例子中,一个Counter值使用了一个Mutex字段来确保它的字段n永远不会被多个协程同时使用。

一个RWMutex值常称为一个读写互斥锁。 对于一个可寻址的RWMutexrwm,数据写入者可以通过方法调用rwm.Lock()获取rwm的写锁,或者通过m.RLock()方法调用获取rwm的读锁。 方法调用rwm.Unlock()rwm.RUnlock()用来释放rwm的写锁和读锁。

注意rwm.Lock()rwm.Unlock()rwm.RLock()rwm.RUnlock()分别是(&rwm).Lock()(&rwm).Unlock()(&rwm).RLock()(&rwm).RUnlock()的简写形式。

对于一个可寻址的RWMutexrwm,下列规则存在:

  • 一个数据写入者只能在rwm的写锁和读锁都尚未被获取持有的情况下才能被成功获取。 换句话说,rwm的写锁在任何时刻最多只能被一个数据写入值成功获取持有,并且rwm的写锁和读锁不能同时被持有。
  • rwm的写锁被一个数据写入者所持有的时候,任何新的试图获取它的写锁或者读锁的操作都将导致当前协程进入阻塞状态,直到此写锁被释放,新的获取写锁或者读锁的试图才有机会成功。
  • rwm的读锁被某个数据读取者所获取持有,新的获取它的写锁的试图将导致当前协程进入阻塞状态。 但是,一个新的获取它的读锁的试图将成功,除非此试图操作发生在某个被阻塞的获取写锁的试图之后(见下一条规则)。 换句话说,一个读写互斥锁的读锁可以同时被多个数据读取者同时持有。
  • 假设rwm的读锁正在被某些数据读取者所持有,为了防止后续数据写入者没有机会成功获取写锁,后续发生在某个被阻塞的获取写锁的试图之后的所有获取读锁的试图将被阻塞。
  • 假设rwm的写锁正在被某个数据写入者所持有,(至少对于标准编译器来说,)为了防止后续数据读取者没有机会成功获取读锁,发生在此写锁下一次被释放之前的所有获取读锁的试图将在此写锁下一次被释放之后肯定取得成功,即使这些所有获取读锁的试图发生在一些仍被阻塞的获取写锁的试图之后。

后两条规则是为了确保数据读取者和写入者都有机会执行它们的操作。

请注意:一个锁并不会绑定到一个协程上;换句话说,一个锁的获取者和此锁的持有者(以及释放者)可能不是一个协程,尽管在实践中这种情况比较少见。

在上一个例子中,如果Value方法被十分频繁调用而Increase方法并不频繁被调用,则Counter类型的m字段的类型可以更改为sync.RWMutex,从而使得执行效率更高,如下面的代码所示。

...
type Counter struct {
    //m sync.Mutex
    m sync.RWMutex
    n uint64
}

func (c *Counter) Value() uint64 {
    //c.m.Lock()
    //defer c.m.Unlock()
    c.m.RLock()
    defer c.m.RUnlock()
    return c.n
}
...

sync.RWMutex值的另一个应用场景是将一个写任务分隔成若干小的写任务。下一节中展示了一个这样的例子。

根据上面列出的后两条规则,下面这个程序最有可能输出abdc

package main

import (
    "fmt"
    "time"
    "sync"
)

func main() {
    var m sync.RWMutex
    go func() {
        m.RLock()
        fmt.Print("a")
        time.Sleep(time.Second)
        m.RUnlock()
    }()
    go func() {
        time.Sleep(time.Second * 1 / 4)
        m.Lock()
        fmt.Print("b")
        time.Sleep(time.Second)
        m.Unlock()
    }()
    go func() {
        time.Sleep(time.Second * 2 / 4)
        m.Lock()
        fmt.Print("c")
        m.Unlock()
    }()
    go func () {
        time.Sleep(time.Second * 3 / 4)
        m.RLock()
        fmt.Print("d")
        m.RUnlock()
    }()
    time.Sleep(time.Second * 3)
    fmt.Println()
}

请注意,上例这个程序仅仅是为了解释和验证上面列出的读写锁的后两条加锁规则。 此程序使用了time.Sleep调用来做协程间的同步。这种所谓的同步方法不应该被使用在生产代码中

sync.Mutexsync.RWMutex值也可以用来实现通知,尽管这不是Go中最优雅的方法来实现通知。
下面是一个使用了Mutex值来实现通知的例子。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var m sync.Mutex
    m.Lock()
    go func() {
        time.Sleep(time.Second)
        fmt.Println("Hi")
        m.Unlock() // 发出一个通知
    }()
    m.Lock() // 等待通知
    fmt.Println("Bye")
}

在此例中,Hi将确保在Bye之前打印出来。 关于sync.Mutexsync.RWMutex值相关的内存顺序保证,请阅读Go中的内存顺序保证一文。

sync.Cond类型

sync.Cond类型提供了一种有效的方式来实现多个协程间的通知。

每个sync.Cond值拥有一个sync.Locker类型的名为L的字段。 此字段的具体值常常为一个*sync.Mutex值或者*sync.RWMutex值。

*sync.Cond类型有三个方法Wait()Signal()Broadcast()

每个Cond值维护着一个先进先出等待协程队列。
对于一个可寻址的Condc

  • c.Wait()必须在c.L字段值的锁被成功获取的时候调用;否则,c.Wait()调用将造成一个恐慌。 一个c.Wait()调用将
    1. 首先将当前协程推入到c所维护的等待协程队列;
    2. 然后调用c.L.Unlock()释放c.L的锁;
    3. 然后使当前协程进入阻塞状态;

          <p><i>
          (当前协程将被另一个协程通过<code>c.Signal()</code><code>c.Broadcast()</code>调用唤醒而重新进入运行状态。)
          </i></p>
      
          <p>
          </p>
          一旦当前协程重新进入运行状态,<code>c.L.Lock()</code>将被调用以试图重新获取<code>c.L</code>字段值的锁。
          此<code>c.Wait()</code>调用将在此试图成功之后退出。
      </li>
      </ol>
    4. 一个c.Signal()调用将唤醒并移除c所维护的等待协程队列中的第一个协程(如果此队列不为空的话)。
    5. 一个c.Broadcast()调用将唤醒并移除c所维护的等待协程队列中的所有协程(如果此队列不为空的话)。

请注意:c.Wait()c.Signal()c.Broadcast()分别为(&c).Wait()(&c).Signal()(&c).Broadcast()的简写形式。

c.Signal()c.Broadcast()调用常用来通知某个条件的状态发生了变化。 一般说来,c.Wait()应该在一个检查某个条件是否已经得到满足的循环中调用。

下面是一个典型的sync.Cond用例。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    const N = 10
    var values [N]string

    cond := sync.NewCond(&sync.Mutex{})

    for i := 0; i < N; i++ {
        d := time.Second * time.Duration(rand.Intn(10)) / 10
        go func(i int) {
            time.Sleep(d) // 模拟一个工作负载
            cond.L.Lock()
            // 下面的修改必须在cond.L被锁定的时候执行
            values[i] = string('a' + i)
            cond.Broadcast() // 可以在cond.L被解锁后发出通知
            cond.L.Unlock()
            // 上面的通知也可以在cond.L未锁定的时候发出。
            //cond.Broadcast() // 上面的调用也可以放在这里
        }(i)
    }

    // 此函数必须在cond.L被锁定的时候调用。
    checkCondition := func() bool {
        fmt.Println(values)
        for i := 0; i < N; i++ {
            if values[i] == "" {
                return false
            }
        }
        return true
    }

    cond.L.Lock()
    defer cond.L.Unlock()
    for !checkCondition() {
        cond.Wait() // 必须在cond.L被锁定的时候调用
    }
}

一个可能的输出:

[         ]
[     f    ]
[  c   f    ]
[  c   f  h  ]
[ b c   f  h  ]
[a b c   f  h  j]
[a b c   f g h i j]
[a b c  e f g h i j]
[a b c d e f g h i j]

因为上例中只有一个协程(主协程)在等待通知,所以其中的cond.Broadcast()调用也可以换为cond.Signal()。 如上例中的注释所示,cond.Broadcast()cond.Signal()不必在cond.L的锁被成功获取的时候调用。

为了防止数据竞争,对自定义条件的修改必须在cond.L的锁被成功获取的时候才能执行。 另外,checkCondition函数和cond.Wait方法也必须在cond.L的锁被成功获取的时候才可被调用。

事实上,对于上面这个特定的例子,cond.L字段的也可以为一个*sync.RWMutex值。 对自定义条件的十个部分的修改可以在RWMutex值的读锁被成功获取的时候执行。这十个修改可以并发进行,因为它们是互不干扰的。 如下面的代码所示:

...
    cond := sync.NewCond(&sync.RWMutex{})
    cond.L.Lock()

    for i := 0; i < N; i++ {
        d := time.Second * time.Duration(rand.Intn(10)) / 10
        go func(i int) {
            time.Sleep(d)
            cond.L.(*sync.RWMutex).RLock()
            values[i] = string('a' + i)
            cond.L.(*sync.RWMutex).RUnlock()
            cond.Signal()
        }(i)
    }
...

在上面的代码中,此sync.RWMutex值的用法有些不符常规。 它的读锁被一些修改数组元素的协程所获取并持有,而它的写锁被主协程获取持有用来读取并检查各个数组元素的值。

Cond值所表示的自定义条件可以是一个虚无。对于这种情况,此Cond值纯粹被用来实现通知。
比如,下面这个程序将打印出abc或者bac

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := sync.WaitGroup{}
    wg.Add(1)
    cond := sync.NewCond(&sync.Mutex{})
    cond.L.Lock()
    go func() {
        cond.L.Lock()
        go func() {
            cond.L.Lock()
            cond.Broadcast()
            cond.L.Unlock()
        }()
        cond.Wait()
        fmt.Print("a")
        cond.L.Unlock()
        wg.Done()
    }()
    cond.Wait()
    fmt.Print("b")
    cond.L.Unlock()
    wg.Wait()
    fmt.Println("c")
}

如果需要,多个sync.Cond值可以共享一个sync.Locker值。但是这种情形在实践中并不多见。

文档更新时间: 2020-12-19 15:40   作者:kuteng