Go的Sync包

Sync Package

sync 包包含对低级别内存访问同步最有用的并发原语。主要是在诸如 struct 这样小范围内,由你决定何时进行内存同步。

WaitGroup

当你并不关心并发操作的结果,或者你有其他的方法来收集他们的结果时,WaitGroup 是等待一组并发操作完成的好方法,如果这两个条件都不满足的话,建议使用 channel 和 select,WaitGroup 非常有用。

example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var wg sync.WaitGroup
wg.Add(1) // 1
go func(){
defer wg.Done() // 2
fmt.Println("1st goroutine sleeping..")
time.Sleep(1)
}()
wg.Add(1) // 1
go func(){
defer wg.Done() // 2
fmt.Println("2nd goroutine sleeping...")
time.Sleep(2)
}()
wg.Wait() // 3
fmt.Println("all goroutine complete")
  • 在 1 处调用 Add 方法计数,参数为 1,表示一个 goroutine 开始
  • 使用 defer 关键字来确保在 goroutine 退出之前执行 Done 操作,告诉 WaitGroup 退出
  • 执行 wait 操作 这将会阻塞 main goroutine。直到所有的 goroutine 表明退出

可以将 WaitGroup 视为一个并发安全的计数器,调用通过传入的整数执行 add 方法 增加计数器的增量,并调用 Done 方法对计数器进行递减,wait 阻塞,直到计数器为零。更方便的写法 Add 方法直接传入 2。

A WaitGroup must not be copied after first use.

这句话很重要 WaitGroup 不是引用类型,在 go 中参数都是值拷贝,所以传递 waitGroup 时应该使用指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
done := make(chan struct{})
workerCount := 2
for i := 0; i < workerCount; i++ {
wg.Add(1)
go doit(i,done,wg)
}
close(done)
wg.Wait()
fmt.Println("all done!")
}
func doit(workerId int,done <-chan struct{},wg sync.WaitGroup) {
fmt.Printf("[%v] is running\n",workerId)
defer wg.Done()
<-done
fmt.Printf("[%v] is done\n",workerId)
}

我们期望的是会正常的输出一些东西,但是运行后发现程序出现了死锁。回想 coffman 条件。出现死锁的原因是 go 的函数参数都是值拷贝,即便是指针传的也是指针的值拷贝只是指针比较特殊它存的是指向那个值的地址,所以这里的各个 worker 得到的都是原始的 waitGroup 变量一个拷贝,并不是指向 main 里的 waitgourp 的内存地址,导致每次 done 之后并没有获取到 main 里的计数,所以死锁。

互斥锁和读写锁

Mutex 互斥锁,Mutex 是互斥的意思,这是保护临界区的一种方式,Mutex 通过同步访问共享内存,channel 通过通信共享内存。一个简单的例子,两个 goroutine 试图增加和减少一个共同的值,它们使用 Mutex 互斥锁来同步访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
	var count int
var lock sync.Mutex
increment := func() {
lock.Lock() // 1
defer lock.Unlock() // 2
count++
fmt.Printf("Incrementing:%d\n", count)
}
decrement := func() {
lock.Lock() // 1
defer lock.Unlock() // 2
count--
fmt.Printf("Decrementing:%d\n", count)
}
// 增量
var arithmetic sync.WaitGroup
for i := 0; i <= 5; i++ {
arithmetic.Add(1)
go func() {
defer arithmetic.Done()
increment()
}()
}
// 减量
for i := 0; i <= 5; i++ {
arithmetic.Add(1)
go func() {
defer arithmetic.Done()
decrement()
}()
}
arithmetic.Wait()
fmt.Println("complete")
}
  • 在 1 处请求对临界区独占,使用互斥锁
  • 在 2 处完成了对临界区的保护

go 有 defer 真的是太好了有没有感觉到。你可以很好地使用它,确保不会发生死锁。进入和退出临界区是有消耗的,所以一般会尽量减少在临界区的时间,这样做的一个策略是减少临界区的范围,可能存在需要多个并发进程之间共享内存的情况,但可能这些进程不是都需要读写次内存,如果是这样,可以利用读写互斥锁 sync.RWMutex.
example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
producer := func(wg *sync.WaitGroup, l sync.Locker) {
defer wg.Done()
for i := 5; i > 0; i-- {
l.Lock()
l.Unlock()
time.Sleep(1)
}
}
observer := func(wg *sync.WaitGroup, l sync.Locker) {
defer wg.Done()
l.Lock()
defer l.Unlock()
}
test := func(count int, mutex, rwMutex sync.Locker) time.Duration {
var wg sync.WaitGroup
wg.Add(count + 1)
beginTestTime := time.Now()
go producer(&wg, mutex)
for i := count; i > 0; i-- {
go observer(&wg, rwMutex)
}
wg.Wait()
return time.Since(beginTestTime)
}
tw := tabwriter.NewWriter(os.Stdout, 0, 1, 2, ' ', 0)
defer tw.Flush()
var m sync.RWMutex
fmt.Fprintf(tw, "Readers\tRWMutex\tMutex\n")
for i := 0; i < 20; i++ {
count := int(math.Pow(2, float64(i)))
fmt.Fprintf(tw, "%d\t%v\t%v\n", count, test(count, &m, m.RLocker()), test(count, &m, &m))
}
  • producer 函数的第二个参数是 sync.Locker 类型,它是一个接口 interface,这个接口有 2 个方法 Lock 和 Unlock 分别对应 Mutex 和 RWMutex 类型。
  • 让 producer 等待 1s

cond

与互斥不同,条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程。条件变量总是与互斥量组合使用。互斥量为共享数据的访问提供互斥支持,而条件变量可以就共享数据的状态的变化向相关线程发出通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
c := sync.NewCond(&sync.Mutex{}) // 1
queue := make([]interface{}, 0, 10) // 2
removeFromQueue := func(delay time.Duration) {
time.Sleep(delay)
c.L.Lock() // 8
queue = queue[1:] // 9
fmt.Println("Remove from queue")
c.L.Unlock() // 10
c.Signal() // 11
}
for i := 0; i < 10; i++ {
c.L.Lock() // 3
for len(queue) == 2 { // 4
c.Wait() // 5
}
fmt.Println("adding to queue")
queue = append(queue, struct{}{})
go removeFromQueue(1 * time.Second) // 6
c.L.Unlock() // 7
}
  • 首先 是标准的 Mutex 作为锁
  • 接下来,创建一个长度为 0 容量为 10 的 slice
  • 在条件的锁寄存器上调用锁进入临界区
  • 检查一个循环中队列的长度
  • 调用 wait 这会暂停 main goroutine 直到一个信号的条件已经发送
  • 创建一个新的 goroutine 它在一秒钟后删除一个元素
  • 退出条件的临界区
  • 再次进入临界区,方便修改与条件有关的数据
  • 通过将切片的头部重新分配到第二个 slice 模拟对一个项目的排队
  • 退出条件的临界区
  • 让一个正在等待的 goroutine 知道发生了什么。

在这个例子中,有一个新的方法 signal,这是 cond 类型提供的两种方法中的一种,它提供通知 goroutine 阻塞的调用 wait,另一个方法是 Broadcast。运行时内部维护一个 fifo 列表等待接受信号。signal 发现等待最长时间的 goroutine 并通知他,而 Broadcast 向所有等待的 goroutine 发送信号。

once

example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var count int
increment := func() {
count++
}
var once sync.Once
var increments sync.WaitGroup
increments.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer increments.Done()
once.Do(increment)
}()
}
increments.Wait()
fmt.Printf("Count is %d\n", count)

上面这个代码会输出什么?通过运行得到的是Count is 1 sync.Once 变量在某种程度上通过 Do 方法把调用又增加了一次,顾名思义 sync.Once 是一种类型它在内部使用一些 sync 源于,却保在不同的 goroutine 上,也只会调用一次 Do 方法处理传递进来的函数,这确实是因为我们将调用 sync.Once 方式执行 Do 方法。这种智能调用一次的功能放入到标准库中看上去很奇怪?使用一个 grep 搜索检查下 go 的源码使用了多少次这个原语呢

grep -ir sync.Once $(go env GOROOT)/src |wc -l

输出为 102,结果结余 Go 1.12.7,不同的 go 版本也许会不同。使用 sync.Once 需要注意几点。

1
2
3
4
5
6
7
8
9
10
11
var count int
increment := func() {
count++
}
decrement := func() {
count--
}
var once sync.Once
once.Do(increment)
once.Do(decrement)
fmt.Printf("Count:%d\n", count)

输出为 Count:1.为什么不是 0 而是 1 呢?因为 sync.Once 只计算调用 Do 方法的次数而不是多少次调用 Do 方法,这样 sync.Once 的副本与所要调用的函数紧密耦合,建议通过将 sync.Once 包装在一个小的语法块中来形式化这种耦合,要么是一个小函数,要么是将两者包装在一个结构体中

1
2
3
4
5
var onceA ,onceB sync.Once
var initB func()
initA :=func(){onceB.Do(initB)}
initB :=func(){onceA.Do(initA)} //1
onceA.Do(initA) // 2

1 这里的调用在 2 返回执行不能进行,这个程序会 deadlock,因为在 1 调用的 Do 直到 2 调用 Do 并退出后才会继续,这是死锁的典型例子

Pool

池 pool 是 Pool 模式的并发安全实现,Pool 模式是一种创建和提供可供使用的固定数量实例或 Pool 实例的方法,它通常用于约束创建昂贵的场景(如数据库连接),以便只创建固定数量的实例,但不确定数量的操作任然可以请求访问这些场景,对于 Go 的 sync.Pool,这个数据类型可以被多个 goroutine 安全的使用

Pool 的主接口是它的 Get 方法,当调用时,Get 将首先检查池中是否有可用的实例返回给调用者,如果没有调用它的 new 方法来创建一个新实例,当完成时,调用者调用 Put 方法把工作中的实例归还到池中,以供其他进程使用。

1
2
3
4
5
6
7
8
9
10
myPool := &sync.Pool{
New: func() interface{} {
fmt.Println("creating new instance")
return struct{}{}
},
}
myPool.Get() // 1
instance := myPool.Get() // 1
myPool.Put(instance) // 1
myPool.Get() // 3
  • 在 1 处调用 Pool 的 get 方法,这些调用将执行 Pool 中定义的 new 函数,因为实例还没有实例化
  • 在 2 处将先前检索到的实例放到池内。
  • 在 3 处调用将重用之前分配的实例并将其放回池中,New 将不会被调用

那么为什么要使用 Pool 而不是在运行时实例化对象呢,Go 是有 Gc 的,因此实例化的对象将被自动清理,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var numCalcsCreated int
calcPool := &sync.Pool{
New: func() interface{} {
numCalcsCreated += 1
mem := make([]byte, 1024)
return &mem
},
}
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())

const numworkers = 1024 * 1024
var wg sync.WaitGroup
wg.Add(numworkers)
for i := numworkers; i > 0; i-- {
go func() {
defer wg.Done()
mem := calcPool.Get().(*[]byte)
defer calcPool.Put(mem)
}()
}
wg.Wait()
fmt.Printf("%d calculators were created.", numCalcsCreated)

如果没有 sync.Pool 结果是不确定的,最坏的情况可能分配了一个十亿字节的内存,但是我只分配了 4kb

另一种情况,用 Pool 来尽可能的将预先分配的对象缓存加载启动,通过提前加载获取引用到另一个对象所需的时间,来节省消费者的时间。在编写高吞吐量的网络服务器时十分常见。服务器试图快速响应请求。

首先创建一个模拟创建到服务的连接函数,让这次的连接花很长的时间

1
2
3
4
func connectToService()interface{}{
time.Sleep(1*time.Second)
return struct{}{}
}

接下来了解一下如果服务为每个请求都启动一个新的连接,那么网络服务的性能如何,编写一个网络处理程序,为每个请求都打开一个新的连接,为了使基础测试简单,我们只允许一次连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func connectToService() interface{} {
time.Sleep(1 * time.Second)
return struct{}{}
}

func init() {
ds := startNetWork()
ds.Wait()
}

func startNetWork() *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(1)
go func() {
server, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Fatal(err)
}
defer server.Close()
wg.Done()
for {
conn, err := server.Accept()
if err != nil {
log.Printf("cannot accpet connection: %v", err)
continue
}
connectToService()
fmt.Fprintf(conn, "")
conn.Close()
}
}()
return &wg
}

func BenchmarkNetWork(b *testing.B) {
for i := 0; i < b.N; i++ {
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
b.Fatalf("cannot dial host %v", err)
}
if _, err := ioutil.ReadAll(conn); err != nil {
b.Fatalf("cannot dial host %v", err)
}
conn.Close()
}
}

运行基准测试得到的结果

1
2
3
4
5
6
go test -bench=. -benchtime=10s goroutine11_test.go
goos: darwin
goarch: amd64
BenchmarkNetWork-8 10 1004388996 ns/op
PASS
ok command-line-arguments 11.061s

使用 sync.Pool 来提升一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func connectToService() interface{} {
time.Sleep(1 * time.Second)
return struct{}{}
}
func warmServiceConnCache() *sync.Pool {
p := &sync.Pool{
New: connectToService,
}
for i := 0; i < 10; i++ {
p.Put(p.New())
}
return p
}

func startNetworkDaemon() *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(1)
go func() {
connPool := warmServiceConnCache()

server, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Fatalf("cannot listen: %v", err)
}
defer server.Close()

wg.Done()

for {
conn, err := server.Accept()
if err != nil {
log.Printf("cannot accept connection: %v", err)
continue
}
svcConn := connPool.Get()
fmt.Fprintln(conn, "")
connPool.Put(svcConn)
conn.Close()
}
}()
return &wg
}
func init() {
daemonStarted := startNetworkDaemon()
daemonStarted.Wait()
}

func BenchmarkNetworkRequest(b *testing.B) {
for i := 0; i < b.N; i++ {
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
b.Fatalf("cannot dial host: %v", err)
}
if _, err := ioutil.ReadAll(conn); err != nil {
b.Fatalf("cannot read: %v", err)
}
conn.Close()
}
}

运行基准测试得到的结果

1
2
3
4
5
6
go test -bench=. -benchtime=10s fast_test.go
goos: darwin
goarch: amd64
BenchmarkNetworkRequest-8 1000 13685744 ns/op
PASS
ok command-line-arguments 43.147s

可以看到在处理代价昂贵的事务时使用这种模式可以极大的提高响应时间。你的并发进程需要请求一个对象,但是在实例化之后很快的处理他们时,或者在这些对象的构造可能会对内存产生负面影响,这时最好使用 Pool 设计模式。
然而有些情况下需要谨慎决定是否应该使用 Pool,如果你使用 Pool 代码所需要的东西不是大概同一性质的,那么从 Pool 中转化检索到所需要内容的时间可能会更多。使用 Pool 记住以下几点:

  • 当时实例化 sync.Pool 使用 new 方法创建一个成员变量,在调用时是线程安全的。
  • 当你收到来自一个 Get 的实例时,不要对锁接受的对象的状态做任何的假设
  • 当你用完了一个从 Pool 中取出来的对象时,一定要调用 Put,否则 Pool 无法复用这个实例了,通常使用 defer 完成。
  • Pool 内的分布必须大致均匀