Go Channel

Channel

channel 用于 goroutine 之间的通信,来自 CSP 派生的同步原语之一。

声明

1
2
var dataStream chan interface{}
dataStream = make(chan interface{})

这样就声明了一个双向 channel,类型是空接口。使用内置函数 make 实例化,不实例化的 channel 值都是 nil。双向意味着这个 channel 是可以写入和读取任何值的,因为我们使用了空接口。当然也可以声明单向的数据流,也就是说创建一个 channel 只支持发送或接受信息。声明一个单向的 channel 也很简单。

1
2
var dataStream <-chan interface{}
dataStream =make(<-chan interface{})

这样就声明了一个只能读取的 channel,将<-符号放在左边就是定义一个只能读取的 channel,放到右边就是定义了一个只能写入的 channel

1
2
var dataStream chan<- interface{}
dataStream = make(chan<-interface{})

一般情况很少会看到单向 channel 的实例化,经常用作函数参数和返回类型。因为当需要时,Go 语言会隐式的将双向 channel 转换为单向的 channel

1
2
3
4
5
var receiveChan <-chan interface{}
var sendchan chan<- interface{}
// 有效的语法
receiveChan = dataStream
sendchan = dataStream

使用 channel

简单的一个例子:

1
2
3
4
5
stringStream := make(chan string)
go func() {
stringStream <- "hello test chan" // 1
}()
fmt.Println(<-stringStream) // 2
  • 在 1 处将字符串文本写入到 stringStream channel
  • 在 2 处读取 stringStream channel 并通过 fmt 打印到 stdout

输出:hello test chan ,只读或只写的 channel 你不能将数据写入或读取。这是错误的。

channel 是如何工作的,因为 Go 语言中的 channel 是阻塞的,这意味着只有 channel 内的数据被消费后,新的数据才能被写入,而任何试图从空 channel 读取数据的 goroutine 将等待至少一条数据被写入 channel 后才能读到,在这个例子中,fmt 会从 stringStream 这个 channel 中消费一条数据,它会等待 channel 中有数据后才开始消费,匿名的 goroutine 试图往这个 channel 里写入一条数据,所以在成功写入之前 goroutine 不会退出,所以 main goroutine 和匿名 goroutine 一定是阻塞的

如果不正确的构造程序,可能会出现 deadlock

1
2
3
4
5
6
7
8
stringStream := make(chan string)
go func() {
if 0 != 1 {
return
}
stringStream <- "hello test chan"
}()
fmt.Println(<-stringStream)

上面的程序运行后会是死锁,因为并没有等到往 channel 里写数据匿名 goroutine 就退出了,而 maingoroutine 中等待写入数据却没有检测到其他运行的 goroutine 并报了一个死锁。

在 go 中判断值是否存在 map 有 ok 的写法,接口断言也有个 ok,那么同样的在 channel 中也存在 ok。这个 ok 代表什么呢?这个 ok 代表该 channel 上有新数据写入或者是关闭的 channel(closed channel)

1
2
3
4
5
6
stringStream := make(chan string)
go func() {
stringStream <- "hello channel"
}()
salutation, ok := <-stringStream
fmt.Printf("(%v):%v", ok, salutation)

使用 close 关键字关闭一个 channel 例如:

1
2
valueStream := make(chan interface{})
close(valueStream)

有趣的是,虽然我们关闭了这个 channel,但是仍然可以从已关闭的 channel 中读取到数据,不过是这个 channel 的 zerovalue

1
2
3
4
intstream := make(chan int)
close(intstream)
value, ok := <-intstream
fmt.Printf("(%v):%v", ok, value)

运行这个程序,输出:(false) 0 , 将没有任何数据的 channel 立即关闭,仍然能执行读取操作。

遍历 channel

遍历 channel 通过 for range,并且在 channel 关闭时自动中断循环。这允许对 channel 的值进行简洁的迭代,

1
2
3
4
5
6
7
8
9
10
intstream := make(chan int)
go func() {
defer close(intstream)
for i := 0; i <= 5; i++ {
intstream <- i
}
}()
for integer := range intstream {
fmt.Printf("%v", integer)
}
  • defer 确保 goroutine 退出之前 channel 关闭的。
  • 遍历 intstream

注意这个迭代不需要退出条件并且这个 range 不会返回第二个 bool 值,因为这个 channel 已经被 close,关闭 channel 也是一种同时给多个 goroutine 发信号的方法,如果有 n 个 goroutine 在一个 channel 上等待,而不是在 channel 上写 n 次来打开每个 goroutine,由于一个被关闭的 channel 可以被无数次读取,所以不管有多少 goroutine 在等待它,关闭 channel 都比执行 n 次更适合更快。这里有一个例子,可以同时打开多个 goroutine

Buffered Channel

带有缓冲的 channel,在实例化时提供容量的 channel。

1
2
var dataStream chan interface{}
dataStream =make(chan interface{},4)

创建一个有 4 个容量的 channel,这意味着我们可以把 4 个东西放到 channel 上不管它是否被读取。没有缓冲的 channel 也可以理解为是一个容量为 0 的缓冲 channel。下面这 2 个代码是等效的

1
2
a :=make(chan int)
b :=make(chan int, 0)

缓冲 channel 是一内存中的 fifo 队列。用一个例子解释下一个具有 4 个容量的缓冲 channel 的情况。首先初始化一个 channel

1
c :=make(chan rune,4)

从逻辑上讲,这创建了一个带有 4 个槽的缓冲区,比如:
image

现在向这个 channel 里写数据c <- 'A',当这个 channel 没有下游读取时,一个数据将被放置在 channel 缓冲区的第一个位置里

image

后续继续像 c 写入就会依次存放在这个槽内

image

经过 4 次写入后,很显然这个槽已经满了,当再一次写入的时候 goroutine 被阻塞了,直到有一些 goroutine 读取了在 bufferedchannel 里的数据

image

当从 channel 中读取出去一个后,数据写入是阻塞的操作,E 被放置在缓冲区的末尾,如果一个 channel 是空的,并且有一个下游接收,那么缓冲区将被忽略,并且该值将直接从发送方传递到接收方。

channel 操作结果

操作 Channel 状态 结果
Read nil 阻塞
Read 打开且非空 输出值
Read 打开但空 阻塞
Read 关闭的 <默认值>,false
Read 只写 编译错误
—- ———— —————
write nil 阻塞
write 打开的但填满 阻塞
write 打开的且不满 写入值
write 关闭的 阻塞
write nil panic
—- ———— —————
close nil panic
close 打开且非空 关闭 channel;读取成功,直到通道耗尽,然后读取产生值的默认值
close 打开但空 关闭 channel;读到生产者的默认值
close 关闭的 panic
close 只写 编译错误

channel 的生产者应该具备如下条件:

  • 实例化 channel
  • 执行写操作,或将所有权传递给另一个 goroutine
  • 关闭 channel
  • 通过一个只读 channel 将它们暴露出来

通过将这些责任分配给了 channel 的生产者,会发生一些事情:

  • 因为我们初始化了 channel,所以我们将死锁的风险转移到 nil channel 上。
  • 因为我们初始化了 channel,所以我们通过关闭一个 nil channel 来消除 panic 风险
  • 因为我们决定了 channel 何时关闭,我们通过写入一个关闭的 channel 来消除 panic
  • 因为我们决定了 channel 何时关闭,所以我们不止一次关闭 channel,从而消除了 panic 的风险
  • 我们在编译时使用类型检查器,以防止写入 channel 异常

channel 的消费者:

  • 知道 channel 何时关闭
  • 正确的处理阻塞

解决第一个问题可以从 read 操作中检查第二个返回值,第二点比较难定义,因为它取决于你的算法,可能星耀超时,可能想要停止消费,或者可能只是对阻塞进程的生命周期有需求,重要的是,作为一个消费者应该知道读取的是阻塞的事实。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
chanowner := func() <-chan int {
result := make(chan int, 5)
go func() {
defer close(result)
for i := 0; i <= 5; i++ {
result <- i
}
}()
return result
}
for value := range chanowner() {
fmt.Printf("Recevied %v\n", value)
}
fmt.Println("Done")

Select

select 加 channel 的组合可以形成更大的抽象事务。声明 select 语句是一个具有并发性的 Go 语言程序中最重要的事情之一,select 语句可以帮助安全地将 channel 与诸如取消,超时,等待,和默认值之类的概念结合在一起
如何使用 select

1
2
3
4
5
6
7
8
9
10
var c1,c2 <-chan interface{}
var c3 chan<- interface{}
select{
case <- c1:
// 执行一些逻辑
case <- c2:
// 执行一些逻辑
case c3<-struct{}{}:
// 执行一些逻辑
}

select 块中的 case 语句没有测试顺序,如果没有任何满足条件,执行也不会失败。当一个 channel 准好好了这个操作就会继续,相应的语句就会执行。
example:

1
2
3
4
5
6
7
8
9
10
11
start := time.Now()
c := make(chan interface{})
go func() {
time.Sleep(5 * time.Second)
close(c)
}()
fmt.Println("Blocking on read...")
select {
case <-c:
fmt.Printf("Unblocked %v later .\n", time.Since(start))
}
  • 5 秒后关闭 channel
    输出如下:
1
2
Blocking on read...
Unblocked 5.001336655s later .

在进入 select 模块后大约 5s 我们就会解锁,这是一种简单而有效的方法来组织我们等待某事的发生,但是思考一下,存在一些问题:

  • 当多个 channel 有数据可供给下游读取的时候会发生什么?
  • 如果没有任何可用的 channel 怎么办
  • 如果我们想要做一些事情,但是没有可用的 channel 怎么办

多个 channel 同时可用的这个问题会发生什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
c1 := make(chan interface{})
close(c1)
c2 := make(chan interface{})
close(c2)
var c1count, c2count int
for i := 1000; i > 0; i-- {
select {
case <-c1:
c1count++
case <-c2:
c2count++
}
}
fmt.Printf("c1count:%d\n c2count:%d\n", c1count, c2count)

输出如下

1
2
c1count:494
c2count:506

在 1000 次循环中,大约有一半的时间从 c2 读取,大约有一半是 c1,Go 语言运行时将在一组 case 语句中执行伪随机选择,这就意味着,在 case 语句集合中每一个都有一个被执行的机会。

第二个问题如果没有任何可用的 channel 怎么办,如果所有的 channel 都被阻塞了,就需要超时机制,Go 的 time 包提供了一种优雅的方式

1
2
3
4
5
6
var c <-chan int
select{
case <- c:
case <- time.After(1*time.Second)
fmt.Println("Time out")
}
  • 这个 case 永远不会被解锁,因为我们是从 nilchannel 中读取的,输出会是 Time out

time.After 函数通过传入 time.Duration 参数返回一个数值并写入 channel,该 channel 会返回执行后的时间,这为 select 语句提供了一种简明的方法。

最后一个问题,没有可用的 channel,像 case 语句一样,select 语句也有默认的 default 语句

1
2
3
4
5
6
7
8
start :=time.Now()
var c1,c2 <-chan int
select{
case <- c1:
case <- c2:
default:
fmt.Println("In default after %v\n\n",time.Since(start))
}

输出In default after 1.421μs可以看到它几乎是瞬间运行了 default,这允许在不阻塞的情况下推出 select 模块,通常你将看到一个默认的子句,它与 for select 循环一起使用。这允许 goroutine 在等待另一个 goroutine 上报结果的同时,可以继续执行自己的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
done := make(chan interface{})
go func(){
time.Sleep(5*time.Second)
close(done)
}()
workcount :=0
loop:
for {
select{
case <- done:
break loop
default:
}
workcount++
time.Sleep(1*time.Second)
}
fmt.Printf("Achieved %v cycles of work before signalled to stop \n",workcount)

输出会是Achieved 5 cycles of work before signalled to stop.

最后对于空的 select 语句又一个特殊的情况,这个 select{}会永远阻塞

GOMAXPROCS 控制

在 runtime 包中,有一个函数 GOMAXPROCS,这个函数控制 OS 线程的数量。在 Go1.5 之前 GOMAXPROCS 总是被设置为 1。通常的使用是 rutime.GOMAXPROCS(runtime.NumCPU())