回顾 Channel
在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,需要限制同一时间能够读写这些变量的线程数量。
虽然在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程。Goroutine 之间会通过 Channel 传递数据。
| 1
2
3
4
5
6
7
 | ch := make(chan int)
// 发送数据
ch <- 1
// 接收数据
i := <-ch
 | 
这里关于 channel 的其他使用就不详细介绍了。
为什么使用 Channel
- 避免协程竞争和数据冲突问题 
- 更高级的抽象,降低开发难度,增加程序可读性 
- 模块之间更容易解耦,增加扩展性和可维护性 
Channel 底层结构
channel 的数据结构是 runtime/chan.go 中的一个 hchan 结构体。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
 | type hchan struct {
	qcount   uint           // channel 中的元素个数
	dataqsiz uint           // channel 能存放的元素容量
	buf      unsafe.Pointer // 用于存放元素的环形缓冲区,指向第一缓存的第一个数据成员
	elemsize uint16         // channel 元素类型的大小
	closed   uint32         // 标识 channel 是否关闭
💿 环形缓存 buf
在 hchan 结构体中,使用以下字段组成了一个环形缓存区:
| 1
2
3
4
5
 | qcount   uint           // channel 中的元素个数
dataqsiz uint           // channel 能存放的元素容量
buf      unsafe.Pointer // 用于存放元素的环形缓冲区,指向第一缓存的第一个数据成员
elemsize uint16         // channel 元素类型的大小
elemtype *_type         // channel 元素类型
 | 
💡 使用环形缓存区的好处:环形地使用数据,类似于一个环形链表,大幅降低 GC 的开销,使用时不需要回收内存。
🔗 发送/接收协程链表
在 hchan 中,通过 recvx 和 revcq 组成了一个接收链表;通过 sendx 和 sendq 组成了一个发送链表。
| 1
2
3
4
 | sendx    uint           // 发送元素进入环形缓冲区的 index(游标)
recvx    uint           // 接收元素所处的环形缓冲区的 index(游标)
recvq    waitq          // 等待接收数据的协程链表
sendq    waitq          // 等待发送数据的协程链表
 | 
等待发送和接收的协程双向链表 recvq sendq 都是 waitq 类型。waitq 是 chan.go 中定义的结构体,有 first 和 last 两个属性,分别指向链表的第一个和最后一个成员。
| 1
2
3
4
 | type waitq struct {
    first *sudog
    last  *sudog
}
 | 
这个 waitq 中的指针都是 sudog 类型。sudog 是协程的一个包装,将一个协程包装成一个节点。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 | type sudog struct {
	g *g
	next *sudog
	prev *sudog
	elem unsafe.Pointer  
    c    *hchan 
    // ... 其他属性
}
 | 
g 包装了协程,使用 next prev 指针,将协程串联成一个链表。
elem 是 unsafe 的万能指针,存储了想要接收变量的数据的指针。比如 i <- ch,这里 elem 就会存储这个变量 i 的指针。
🔒 保护 hchan 的互斥锁 mutex
这个 lock mutex 用来保护 hchan struct 的所有字段,所有协程想要操作 hchan 这个结构体的内容,就需要加这个 mutex 锁。
并不是用来排队发送数据、接收数据的。
为什么 channel 的内部有锁,还能达到高并发量呢?
channel 的互斥锁在大部分情况下仅用于保护少数关键操作,而不是每次读写操作都加锁,只有在写数据/读数据的一瞬间需要加锁,不需要一直加锁。
使用了其他一些技术来提高 channel 的性能,比如使用无锁的循环队列实现缓冲区,减少锁的使用。
Go 语言的运行时系统(runtime)内部对锁有一些优化技术,可以减少互斥锁的开销。
🙋♂️ channel 发送数据原理
如果我们想向 channel 中发送数据,会使用 <- 关键字。
| 1
2
3
4
 | ch := make(chan int)
// 发送数据
ch <- 1
 | 
这个 <- 是 go 中的“语法糖”,在编译阶段,会把 <- 转换成 chansend1()。
| 1
2
3
4
5
6
7
8
 | // chan.go
//
// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}
 | 
📚 发送情形
channel 的发送情形可以分为三类:
➡️ 直接发送
当发送数据时,在 hchan 的 recvq 中存在休眠等待接收的协程。(这个时候缓存一定为空,或者没有缓存,不用考虑)
直接将数据拷贝给该协程的接收变量,并唤醒在 recvq 中的这个等待协程。

|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 | // chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)
    // ...
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    
    // ..
}
 | 
💾 放入缓存
当没有协程在等待接收数据,但是还有缓存空间 qcount < dataqsiz,将数据放入环形缓存,维护索引 sendx qcount,成功返回。

|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
 | // chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)
    // ...
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
    // ...
}
 | 
💤 等待休眠
当没有协程在休眠等待数据,并且缓存已经满了(或没有缓存),将自己包装成一个 sudog,放入 sendq 队列,解锁后进行休眠。

|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)
    // ...
    // Block on the channel.
    gp := getg()
    mysg := acquireSudog()  // 包装成 sudog
    mysg.elem = ep
    mysg.g = gp
    mysg.c = c
    gp.waiting = mysg
    c.sendq.enqueue(mysg)  // 入队
    
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)  // 休眠,之后改协程卡在这段代码
    
	KeepAlive(ep)
	// someone woke us up.
	if mysg != gp.waiting {  // 唤醒这个协程,只需要维护一下数据即可(该协程被唤醒,数据已经是被取走了的)
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}
 | 
- 加锁 
- 构造封装当前 goroutine 的 sudog 对象 
- 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系 
- 把 sudog 添加到当前 channel 的阻塞写协程队列中 
- park 当前协程 
- 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走,维护其他数据) 
- 解锁 
💁♂️ Chanel 接收数据原理
| 1
2
3
4
5
6
7
8
 | ch := make(chan int)
// 发送数据
ch <- 1
// 接收数据
i := <- ch
 | 
接收数据的 <- 同样是 go 中的“语法糖”。
在编译阶段,如果是 i <- ch 会转换成 runtime.chanrecv1();如果是 i, ok <- ch 会转化成 runtime.chanrecv2(),两者最终都会调用 chanrecv() 方法。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 | // entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}
 | 
📚 接收情形
可以大致可以分为以下四大类:
- ➡️ 接收有阻塞的发送协程 
- 💿 接收无阻塞发送协程,且缓冲区有元素 
- 💤 接收无阻塞发送协程,且缓冲区无元素 
➡️ 接收有阻塞的发送协程
为什么有发送协程阻塞的时候,还是要从缓存中获取数据呢?
缓存区的数据一定比等待发送的协程中的数据更早;如果不从缓存中接收,缓存中的数据可能一直拿不走。

|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 | // chan.go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   
    lock(&c.lock)
    // Just found waiting sender with not closed.
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
     }
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	goready(gp, skip+1)
}
 | 
💿 接收无阻塞发送协程,且缓冲区有元素
在 channel 中,如果没有协程在发送队列 sendq 等待接收,判断该 channel 中有无缓存,如果有,直接从缓存中取走一个数据。

|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
}
 | 
- 加锁 
- 获取 - recvx对应位置的元素
 
- recvx++
 
- qcount--
 
- 解锁,返回 
💤 接收无阻塞发送协程,且缓冲区无元素
在 channel 中,如果在 sendq 中没有协程在等待发送数据,并且 buf 中没有缓存,将该接收协程包装成 sudog,并放入等待队列 recvq 中,进行休眠。
这个放入队列的接收协程,当被唤醒时,数据已经拷贝到位,不需要考虑如何拷贝。

|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    gp.waiting = nil
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}
 | 
- 加锁 
- 封装当前接收协程的 - sudog对象
 
- 完成指针指向,建立 - sudog、- goroutine、- channel之间的指向关系
 
- 把 - sudog添加到当前- channel的阻塞读协程队列中
 
- park当前协程
 
- 倘若协程从 - park中被唤醒,则回收- sudog(- sudog能被唤醒,其对应的元素必然已经被写入)
 
- 解锁,返回 
📖参考