Featured image of post golang channel

golang channel

回顾 Channel

在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,需要限制同一时间能够读写这些变量的线程数量。

虽然在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程。Goroutine 之间会通过 Channel 传递数据。

1
2
3
4
5
6
7
ch := make(chan int)

// 发送数据
ch <- 1

// 接收数据
i := <-ch

这里关于 channel 的其他使用就不详细介绍了。

为什么使用 Channel

  • 避免协程竞争和数据冲突问题

  • 更高级的抽象,降低开发难度,增加程序可读性

  • 模块之间更容易解耦,增加扩展性和可维护性

Channel 底层结构

channel 的数据结构是 runtime/chan.go 中的一个 hchan 结构体。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type hchan struct {
	qcount   uint           // channel 中的元素个数
	dataqsiz uint           // channel 能存放的元素容量
	buf      unsafe.Pointer // 用于存放元素的环形缓冲区,指向第一缓存的第一个数据成员
	elemsize uint16         // channel 元素类型的大小
	closed   uint32         // 标识 channel 是否关闭![](
	elemtype *_type         // channel 元素类型
	sendx    uint           // 发送元素进入环形缓冲区的 index
	recvx    uint           // 接收元素所处的环形缓冲区的 index
	recvq    waitq          // 等待接收数据的协程链表
	sendq    waitq          // 等待发送数据的协程链表
	lock     mutex          // 互斥锁,保护 hchan 结构体本身
}

概括一下,Channel 主要由三大核心部分组成。

  • 💿 一个环形缓存 buf

  • 🔗 两个链表(发送协程链表 recvq waitq / 接收协程链表 sendq waitq

  • 🔒 一个保护 hchan 的互斥锁 mutex

💿 环形缓存 buf

hchan 结构体中,使用以下字段组成了一个环形缓存区:

1
2
3
4
5
qcount   uint           // channel 中的元素个数
dataqsiz uint           // channel 能存放的元素容量
buf      unsafe.Pointer // 用于存放元素的环形缓冲区,指向第一缓存的第一个数据成员
elemsize uint16         // channel 元素类型的大小
elemtype *_type         // channel 元素类型

💡 使用环形缓存区的好处:环形地使用数据,类似于一个环形链表,大幅降低 GC 的开销,使用时不需要回收内存。

🔗 发送/接收协程链表

hchan 中,通过 recvxrevcq 组成了一个接收链表;通过 sendxsendq 组成了一个发送链表。

1
2
3
4
sendx    uint           // 发送元素进入环形缓冲区的 index(游标)
recvx    uint           // 接收元素所处的环形缓冲区的 index(游标)
recvq    waitq          // 等待接收数据的协程链表
sendq    waitq          // 等待发送数据的协程链表

等待发送和接收的协程双向链表 recvq sendq 都是 waitq 类型。waitqchan.go 中定义的结构体,有 firstlast 两个属性,分别指向链表的第一个和最后一个成员。

1
2
3
4
type waitq struct {
    first *sudog
    last  *sudog
}

这个 waitq 中的指针都是 sudog 类型。sudog 是协程的一个包装,将一个协程包装成一个节点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type sudog struct {
	g *g

	next *sudog
	prev *sudog
	elem unsafe.Pointer  

    c    *hchan 

    // ... 其他属性
}

g 包装了协程,使用 next prev 指针,将协程串联成一个链表。

elemunsafe 的万能指针,存储了想要接收变量的数据的指针。比如 i <- ch,这里 elem 就会存储这个变量 i 的指针。

🔒 保护 hchan 的互斥锁 mutex

这个 lock mutex 用来保护 hchan struct 的所有字段,所有协程想要操作 hchan 这个结构体的内容,就需要加这个 mutex 锁。

并不是用来排队发送数据、接收数据的。

为什么 channel 的内部有锁,还能达到高并发量呢?

channel 的互斥锁在大部分情况下仅用于保护少数关键操作,而不是每次读写操作都加锁,只有在写数据/读数据的一瞬间需要加锁,不需要一直加锁。

使用了其他一些技术来提高 channel 的性能,比如使用无锁的循环队列实现缓冲区,减少锁的使用。

Go 语言的运行时系统(runtime)内部对锁有一些优化技术,可以减少互斥锁的开销。

🙋‍♂️ channel 发送数据原理

如果我们想向 channel 中发送数据,会使用 <- 关键字。

1
2
3
4
ch := make(chan int)

// 发送数据
ch <- 1

这个 <- 是 go 中的“语法糖”,在编译阶段,会把 <- 转换成 chansend1()

1
2
3
4
5
6
7
8
// chan.go
//
// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

📚 发送情形

channel 的发送情形可以分为三类:

  • ➡️ 直接发送

  • 💾 放入缓存

  • 💤 休眠等待

➡️ 直接发送

当发送数据时,在 hchanrecvq 中存在休眠等待接收的协程。(这个时候缓存一定为空,或者没有缓存,不用考虑)

直接将数据拷贝给该协程的接收变量,并唤醒在 recvq 中的这个等待协程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...

    lock(&c.lock)

    // ...

    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    
    // ..
}
  • 加锁

  • 从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog,如果不为 nil,直接发送

  • 在 send 方法中,会基于 memmove 方法,直接将元素拷贝交给 sudog 对应的 goroutine

  • 在 send 方法中会完成解锁动作

💾 放入缓存

当没有协程在等待接收数据,但是还有缓存空间 qcount < dataqsiz,将数据放入环形缓存,维护索引 sendx qcount,成功返回。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)
    // ...
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // ...
}
  • 加锁

  • 将当前元素添加到环形缓冲区 sendx 对应的位置

  • 维护索引 sendx++ qcount++

  • 解锁,返回成功

💤 等待休眠

当没有协程在休眠等待数据,并且缓存已经满了(或没有缓存),将自己包装成一个 sudog,放入 sendq 队列,解锁后进行休眠。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)

    // ...

    // Block on the channel.
    gp := getg()
    mysg := acquireSudog()  // 包装成 sudog
    mysg.elem = ep
    mysg.g = gp
    mysg.c = c
    gp.waiting = mysg
    c.sendq.enqueue(mysg)  // 入队
    
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)  // 休眠,之后改协程卡在这段代码
    
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {  // 唤醒这个协程,只需要维护一下数据即可(该协程被唤醒,数据已经是被取走了的)
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}
  • 加锁

  • 构造封装当前 goroutine 的 sudog 对象

  • 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系

  • 把 sudog 添加到当前 channel 的阻塞写协程队列中

  • park 当前协程

  • 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走,维护其他数据)

  • 解锁

💁‍♂️ Chanel 接收数据原理

1
2
3
4
5
6
7
8
ch := make(chan int)

// 发送数据
ch <- 1

// 接收数据

i := <- ch

接收数据的 <- 同样是 go 中的“语法糖”。

在编译阶段,如果是 i <- ch 会转换成 runtime.chanrecv1();如果是 i, ok <- ch 会转化成 runtime.chanrecv2(),两者最终都会调用 chanrecv() 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

📚 接收情形

可以大致可以分为以下四大类:

  • ➡️ 接收有阻塞的发送协程

  • 💿 接收无阻塞发送协程,且缓冲区有元素

  • 💤 接收无阻塞发送协程,且缓冲区无元素

➡️ 接收有阻塞的发送协程

  • 无缓冲区,有协程在 sendq 中等待发送数据,直接将数据从发送协程中拷贝过来,再唤醒该发送协程。

  • 有缓冲区,读取缓冲区 recvx 元素,将 sendq 中的一个发送协程的元素写入缓冲区。

为什么有发送协程阻塞的时候,还是要从缓存中获取数据呢?

缓存区的数据一定比等待发送的协程中的数据更早;如果不从缓存中接收,缓存中的数据可能一直拿不走。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// chan.go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   
    lock(&c.lock)


    // Just found waiting sender with not closed.
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
     }
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	goready(gp, skip+1)
}
  • 加锁

  • 从阻塞发送协程队列中获取到一个发送协程

  • 倘若 channel 无缓冲区,则直接读取发送协程元素,并唤醒发送协程

  • 倘若 channel 有缓冲区,则读取缓冲区头部元素,并将发送协程元素发送入缓冲区尾部后唤醒发送协程

  • 解锁,返回

💿 接收无阻塞发送协程,且缓冲区有元素

在 channel 中,如果没有协程在发送队列 sendq 等待接收,判断该 channel 中有无缓存,如果有,直接从缓存中取走一个数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    lock(&c.lock)

    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
}
  • 加锁

  • 获取 recvx 对应位置的元素

  • recvx++

  • qcount--

  • 解锁,返回

💤 接收无阻塞发送协程,且缓冲区无元素

在 channel 中,如果在 sendq 中没有协程在等待发送数据,并且 buf 中没有缓存,将该接收协程包装成 sudog,并放入等待队列 recvq 中,进行休眠。

这个放入队列的接收协程,当被唤醒时,数据已经拷贝到位,不需要考虑如何拷贝。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)

    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    gp.waiting = nil
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}
  • 加锁

  • 封装当前接收协程的 sudog 对象

  • 完成指针指向,建立 sudoggoroutinechannel 之间的指向关系

  • sudog 添加到当前 channel 的阻塞读协程队列中

  • park 当前协程

  • 倘若协程从 park 中被唤醒,则回收 sudogsudog能被唤醒,其对应的元素必然已经被写入)

  • 解锁,返回

📖参考

Licensed under CC BY-NC-SA 4.0