上篇已经记录到发送数据到 chanel 的三种情况的代码逻辑,接下来是从 chanel 接收数据的逻辑。
chanrecv 方法
和 chansend 方法十分类似
如果 hchan 为空
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
如果 chenel 已经关闭
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
接收数据的三种情况
- 如果 hchan 的 sendq 队列中有阻塞的 goroutine,buf 已满
- 如果 hchan.buf 还有数据未取出
- 如果 hchan.buf 为空
下面分别截取三种情况的代码段。
如果 hchan 的 sendq 队列中有阻塞的 goroutine,buf 已满
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
如果 hchan.buf 还有数据未取出
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
如果 hchan.buf 为空
··· // 上面条件都不满足,则只剩一种情况:hchan.buf 为空
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
阻塞与唤醒
假设有一个 goroutine sender,和一个 goroutine reciever,如果 reciever 执行 chanrecv 方法的时候,buf 已经为空了,从上面的代码最后一行知道,goparkunlock 方法使 reciever 阻塞,那么 sender 写数据进 chanel,reciever 又如何被 sender 唤醒呢?
reciever 休眠后, sender 来了,sender 执行到以下代码处。sender 从 recvq 队列 弹出 reciever,然后执行 send 方法。
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
send 方法最后一行执行 goready 方法将 reciever 唤醒。
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
reciever 唤醒后,继续执行 chanrecv 方法剩下的语句:
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
closechan 方法
close(ch) 对应的执行方法即为 closechan 方法。主要有以下步骤:
- 关闭 nil chanel ,返回 panic:”close of nil channel”
- 关闭 closed chanel,返回 panic:”close of closed channel”
- 将 hchan.closed 的值设为 1
- 释放所有 reader sudog 对象,释放的同时将 sudog 中的 g 插入 glist 链表
- 释放所有 writer sudog 对象,释放的同时将 sudog 中的 g 插入 glist 链表
- 遍历 glist 链表,唤醒 glist 中的所有 goroutine
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist *g
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
gp.schedlink.set(glist) // <------ a 语句
glist = gp // <------ b 语句
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
gp.schedlink.set(glist) // <------ c 语句
glist = gp // <------ d 语句
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr() // <------ e 语句
gp.schedlink = 0
goready(gp, 3)
}
}
//go:nosplit
func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) } // 获得 gp 的地址
//go:nosplit
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) } // 设置 gp 的地址
上面代码有个注意的点: schedlink 是 G 的一个属性,用于指向下一个 G 。
参考文章