Go与并发(二)

Go 并发安全

通过 go 创建一个 goroutine 是很容易的,但是当你在写并发的程序时你要思考可能遇到的各种各样的问题,它不受语言的局限。

Go 语言 runtime 完成了大部分的繁重工作,Go 有低延迟的 GC,在一中语言中使用 GC 是否是正确的?关于这个问题你会看到不同的答案,有些人认为,在需要实时性或确定性的领域中,GC 妨碍了程序的运行,暂停程序的所有活动来运行 GC 是不可接受的,但是 Go 的 GC 相较 Go 1.8 之前有了很大的提升,GC 暂停一般在 10~100μs 之间。

内存管理本身就是一件很困难的事情,当与并发结合使用时,编写正确的并发就更困难,Go 语言并不需要用户去关心内存分配,到底是堆上还是栈上,如果不需要担心只有 10μs 的暂停时间,不通过强迫用户管理内存,Go 语言可以很容易的创建的并发程序。

并发与并行

这 2 个概念经常会被混淆,如何区分?简单的一个陈述是这样的

并发属于代码,并行属于一个运行中的程序。

针对一个核心的 cpu,两个程序看上去是并行的,事实上,它们在用不可被注意到的速度进行顺序执行,CPU 的上下文在一个时间颗粒度之内一直在不同的程序之间进行切换分享 cpu 时间使得任务好像是并行的,如果在 2 个核心的 cpu 上执行,它们可能就是真的在并行执行了。

我们写的是可以并发执行的代码,并行是我们程序运行时的属性。并行是一个时间或上下文的函数,这个上下文定义为两个或者以上的操作被认为是并行的界限。例如我们的上下文是一段 5s 的时长,我们执行了两个分别小号 1s 执行的操作,我们应该认为这些操作是并行执行的,如果我们的上下文是 1s,我们应该认为操作是分别运行的。

CSP 模型

CSP 是什么?完整是Communicating Sequential Processes 通信进程模型,它来自 Charles Antony Richard Hoare 的论文,简单的进行描述,CSP 描述这样的一种并发模型,多个 Process 使用一个 Channel 进行通信,这个 Channel 连结的 Process 通常是匿名的,消息传递通常是同步的(ActorModel),严格来说 CSP 是一门形式语言,用于描述并发系统的互动模型,衍生的语言有/Occam/Limbo/Golang…

Golang 其实只用到了 CSP 的很小一部分,即理论中的 Process/Channel 对应 Go 中的 goroutine/channel。这 2 个并发原语之间没有从属关系,Process 可以订阅任意个 Channel。Channel 也不关心是哪个 Process 再利用它通信,Process 再利用它进行通信,Process 围绕 Channel 进行读写,形成一套有序阻塞和可预测的并发模型

使用通信来共享内存,而不是通过共享内存来通信。

RobPike 在 Google I/O 2012 上给出的描述:

goroutine 可以视为开销很小的线程(但是 goroutine 不是物理线程也不是协程,与用户态协程也有一些区别,但它有自己的调用栈,并且这个栈的大小可伸缩的。)

决策树准则
image

fork-join 模型

fork-join 并发模型,fork 指的是在程序中的任意一点,它可以将执行的子分支与其父节点同时运行,join 指的是在将来的某个时候,这些并发的执行分支将会合并在一起
image

example:

1
2
3
4
sayhello :=func(){
fmt.Println("hello")
}
go sayhello()

注意:go 现在默认使用多核,这段程序是有可能输出的,但是几率比较小。通过runtime设置单核这个程序是不会有输出的,main goroutine 直接退出了。我们可以通过让 main goroutine 休眠一段时间来等待 sayhello 这个协程执行完time.Sleep,但是我们并没有创建一个 join 点,只有一个竞争条件。

为了创建一个 join 点,必须同步 main goroutine 和 sayhello goroutine。可以用很多方式实现,这里使用sync包提供的 waitgroup。

1
2
3
4
5
6
7
var wg sync.WaitGroup
sayhello :=func(){
defer wg.Done()
fmt.Println("hello")
}
wg.Add(1)
wg.Wait() // join point

这个wg.Wait()就是一个连接点,这里顺便可以讲一下关于闭包的一些事情,同样可以创建一个 goroutine 的闭包。

1
2
3
4
5
6
7
8
9
var wg sync.WaitGroup
s := "hello"
wg.Add(1)
go func(){
defer wg.Done()
s = "welcome"
}()
wg.Wait()
fmt.Println(s)

会输出什么?答案是会是welcome,当使用闭包的时候,在闭包内的外部变量,是引用的,这个变量并且逃逸到了堆上,在 go 中一般非引用类型会分配在栈上,引用类型一般分配在堆上,这个 s 是 string 类型会被分配在栈上,但是被闭包引用后这个变量 s 逃逸到了堆上。可以通过 go 提供的gcflags -m 来查看变量逃逸。
example2:

1
2
3
4
5
6
7
8
9
var wg sync.WaitGroup
for _,value := range []string{"hello","greetings","good day"}{
wg.Add(1)
go func(){
defer wg.Done()
fmt.Println(value)
}()
}
wg.Wait()

上面这个例子会输出什么?也许你认为会是将 slice 里的值都打印了出来,实际运行你会发现输出的都是good day 同样的这是一个闭包,匿名函数引用了外部的 value,这 value 是一个临时变量,会被反复的使用,但是它本身的地址并没有发生任何的变化,当 goroutine 访问到这个 value 时,range 迭代已经结束了,保存在这个地址上的最后的值是good day,所以输出的答案是三次good day

想要输出这个 slice 的全部内容只需要将这个 value 的副本传递到闭包中,这样当 goroutine 运行时,它将从循环的迭代中操作数据

1
2
3
4
5
6
7
8
9
var wg sync.WaitGroup
for _,value := range []string{"hello","greetings","good day"}{
wg.Add(1)
go func(value){
defer wg.Done()
fmt.Println(value)
}(value)
}
wg.Wait()
  • 匿名函数声明了一个参数,把原来的变量 value 显式的映射到了闭包中。
  • 将当前迭代的变量传递给闭包,创建了一个字符串结构的副本,从而确保 goroutine 运行时,可以引用到适当的字符串。
  • 以上 2 个知识点并不是只有应用在 goroutine 中会出现请注意。

goroutine 的另一个好处是它们非常的轻,GC 并没有回收被丢弃的 goroutine。下面是来 Go FAQ 的摘录

一个新创建的 goroutine 被赋予了几千字节,这在大部分情况下都是足够的,当它不运行时,Go 语言 runtime 就会自动增长(缩小)存储堆栈的内存,允许许多 goroutine 存在适当的内存中,每个函数调用 cpu 的开销平均为 3 个廉价指令,在同一个地址空间创建成千上万的 goroutine 是可行的。

下面的例子汇总,将 goroutine 不被 GC 的事实与运行时的自省能力结合起来,并测算在 goroutine 创建之前和之后分配的内存数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
memConsumed :=func() uint64{
runtime.GC()
var s runtime.MemStats
runtime.ReadMemStats(&s)
return s.Sys
}

var c <- chan interface{}
var wg sync.WaitGroup
noop := func(){ wg.Done();<-c} // 1

const numGoroutines = 1e4 // 2
wg.Add(numGoroutines)
before := memConsumed() // 3
for i := numGoroutines;i>0;i--{
go noop()
}
wg.Wait()
after := memConsumed()
fmt.Printf("%.3fkb",float64(after-before)/numGoroutines/1000)

你可以看到 goroutine 确实很小且可伸缩的。下面这个例子通过基准测试查看上下文的切换时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var wg sync.WaitGroup
begin := make(chan struct{})
c := make(chan struct{})
var token struct{}
sender := func() {
defer wg.Done()
<-begin // 1
for i := 0; i < b.N; i++ {
c <- token // 2
}
}
receiver := func() {
defer wg.Done()
<-begin // 1
for i := 0; i < b.N; i++ {
<-c // 3
}
}
wg.Add(2)
go sender()
go receiver()
b.StartTimer() // 4
close(begin) // 5
wg.Wait()
  • 在 1 处等待直到被告知开始执行。
  • 在 2 处将消息发送到接收器 goroutine,一个 struct{}{} 被称为一个空的结构体,它没有内存占用。
  • 在 3 处收到一条信息,但是什么都不做。
  • 在 4 处开始计时
  • 告诉两个 goroutine 开始运行。

运行基准测试 基于 mac 的结果

1
2
3
4
5
6
 go test -bench=. -cpu=1 goroutine5_test.go
goos: darwin
goarch: amd64
BenchmarkContextSwitch 10000000 168 ns/op
PASS
ok command-line-arguments 1.902s

每个上下文的切换需要 168ns,goroutine 会导致上下文切换过于频繁,但是可以很轻松的说上限很可能不会成为使用 goroutine 的任何障碍