回顾 Channel
在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,需要限制同一时间能够读写这些变量的线程数量。
虽然在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程。Goroutine 之间会通过 Channel 传递数据。
1
2
3
4
5
6
7
| ch := make(chan int)
// 发送数据
ch <- 1
// 接收数据
i := <-ch
|
这里关于 channel 的其他使用就不详细介绍了。
为什么使用 Channel
避免协程竞争和数据冲突问题
更高级的抽象,降低开发难度,增加程序可读性
模块之间更容易解耦,增加扩展性和可维护性
Channel 底层结构
channel 的数据结构是 runtime/chan.go
中的一个 hchan
结构体。
1
2
3
4
5
6
7
8
9
10
11
12
13
| type hchan struct {
qcount uint // channel 中的元素个数
dataqsiz uint // channel 能存放的元素容量
buf unsafe.Pointer // 用于存放元素的环形缓冲区,指向第一缓存的第一个数据成员
elemsize uint16 // channel 元素类型的大小
closed uint32 // 标识 channel 是否关闭![](
elemtype *_type // channel 元素类型
sendx uint // 发送元素进入环形缓冲区的 index
recvx uint // 接收元素所处的环形缓冲区的 index
recvq waitq // 等待接收数据的协程链表
sendq waitq // 等待发送数据的协程链表
lock mutex // 互斥锁,保护 hchan 结构体本身
}
|
概括一下,Channel 主要由三大核心部分组成。
💿 环形缓存 buf
在 hchan
结构体中,使用以下字段组成了一个环形缓存区:
1
2
3
4
5
| qcount uint // channel 中的元素个数
dataqsiz uint // channel 能存放的元素容量
buf unsafe.Pointer // 用于存放元素的环形缓冲区,指向第一缓存的第一个数据成员
elemsize uint16 // channel 元素类型的大小
elemtype *_type // channel 元素类型
|
💡 使用环形缓存区的好处:环形地使用数据,类似于一个环形链表,大幅降低 GC 的开销,使用时不需要回收内存。
🔗 发送/接收协程链表
在 hchan
中,通过 recvx
和 revcq
组成了一个接收链表;通过 sendx
和 sendq
组成了一个发送链表。
1
2
3
4
| sendx uint // 发送元素进入环形缓冲区的 index(游标)
recvx uint // 接收元素所处的环形缓冲区的 index(游标)
recvq waitq // 等待接收数据的协程链表
sendq waitq // 等待发送数据的协程链表
|
等待发送和接收的协程双向链表 recvq
sendq
都是 waitq
类型。waitq
是 chan.go
中定义的结构体,有 first
和 last
两个属性,分别指向链表的第一个和最后一个成员。
1
2
3
4
| type waitq struct {
first *sudog
last *sudog
}
|
这个 waitq
中的指针都是 sudog
类型。sudog
是协程的一个包装,将一个协程包装成一个节点。
1
2
3
4
5
6
7
8
9
10
11
| type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
c *hchan
// ... 其他属性
}
|
g
包装了协程,使用 next
prev
指针,将协程串联成一个链表。
elem
是 unsafe
的万能指针,存储了想要接收变量的数据的指针。比如 i <- ch
,这里 elem
就会存储这个变量 i
的指针。
🔒 保护 hchan 的互斥锁 mutex
这个 lock mutex
用来保护 hchan struct
的所有字段,所有协程想要操作 hchan
这个结构体的内容,就需要加这个 mutex
锁。
并不是用来排队发送数据、接收数据的。
为什么 channel 的内部有锁,还能达到高并发量呢?
channel 的互斥锁在大部分情况下仅用于保护少数关键操作,而不是每次读写操作都加锁,只有在写数据/读数据的一瞬间需要加锁,不需要一直加锁。
使用了其他一些技术来提高 channel 的性能,比如使用无锁的循环队列实现缓冲区,减少锁的使用。
Go 语言的运行时系统(runtime)内部对锁有一些优化技术,可以减少互斥锁的开销。
🙋♂️ channel 发送数据原理
如果我们想向 channel 中发送数据,会使用 <-
关键字。
1
2
3
4
| ch := make(chan int)
// 发送数据
ch <- 1
|
这个 <-
是 go 中的“语法糖”,在编译阶段,会把 <-
转换成 chansend1()
。
1
2
3
4
5
6
7
8
| // chan.go
//
// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
|
📚 发送情形
channel 的发送情形可以分为三类:
➡️ 直接发送
当发送数据时,在 hchan
的 recvq
中存在休眠等待接收的协程。(这个时候缓存一定为空,或者没有缓存,不用考虑)
直接将数据拷贝给该协程的接收变量,并唤醒在 recvq
中的这个等待协程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| // chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// ..
}
|
💾 放入缓存
当没有协程在等待接收数据,但是还有缓存空间 qcount < dataqsiz
,将数据放入环形缓存,维护索引 sendx
qcount
,成功返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| // chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// ...
}
|
💤 等待休眠
当没有协程在休眠等待数据,并且缓存已经满了(或没有缓存),将自己包装成一个 sudog
,放入 sendq
队列,解锁后进行休眠。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
// Block on the channel.
gp := getg()
mysg := acquireSudog() // 包装成 sudog
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg) // 入队
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 休眠,之后改协程卡在这段代码
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting { // 唤醒这个协程,只需要维护一下数据即可(该协程被唤醒,数据已经是被取走了的)
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
|
加锁
构造封装当前 goroutine 的 sudog 对象
完成指针指向,建立 sudog、goroutine、channel 之间的指向关系
把 sudog 添加到当前 channel 的阻塞写协程队列中
park 当前协程
倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走,维护其他数据)
解锁
💁♂️ Chanel 接收数据原理
1
2
3
4
5
6
7
8
| ch := make(chan int)
// 发送数据
ch <- 1
// 接收数据
i := <- ch
|
接收数据的 <-
同样是 go 中的“语法糖”。
在编译阶段,如果是 i <- ch
会转换成 runtime.chanrecv1()
;如果是 i, ok <- ch
会转化成 runtime.chanrecv2()
,两者最终都会调用 chanrecv()
方法。
1
2
3
4
5
6
7
8
9
10
11
12
| // entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
|
📚 接收情形
可以大致可以分为以下四大类:
➡️ 接收有阻塞的发送协程
💿 接收无阻塞发送协程,且缓冲区有元素
💤 接收无阻塞发送协程,且缓冲区无元素
➡️ 接收有阻塞的发送协程
为什么有发送协程阻塞的时候,还是要从缓存中获取数据呢?
缓存区的数据一定比等待发送的协程中的数据更早;如果不从缓存中接收,缓存中的数据可能一直拿不走。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| // chan.go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
|
💿 接收无阻塞发送协程,且缓冲区有元素
在 channel 中,如果没有协程在发送队列 sendq
等待接收,判断该 channel 中有无缓存,如果有,直接从缓存中取走一个数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
}
|
加锁
获取 recvx
对应位置的元素
recvx++
qcount--
解锁,返回
💤 接收无阻塞发送协程,且缓冲区无元素
在 channel 中,如果在 sendq
中没有协程在等待发送数据,并且 buf
中没有缓存,将该接收协程包装成 sudog
,并放入等待队列 recvq
中,进行休眠。
这个放入队列的接收协程,当被唤醒时,数据已经拷贝到位,不需要考虑如何拷贝。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
gp.waiting = nil
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
|
加锁
封装当前接收协程的 sudog
对象
完成指针指向,建立 sudog
、goroutine
、channel
之间的指向关系
把 sudog
添加到当前 channel
的阻塞读协程队列中
park
当前协程
倘若协程从 park
中被唤醒,则回收 sudog
(sudog
能被唤醒,其对应的元素必然已经被写入)
解锁,返回
📖参考