快速路径
运用 select 中的 default 分支可以实现无锁(非阻塞)的快速检查
select {
case v, ok := <- ch:
if !ok { // ch 已经被关闭, 直接返回
return io.EOF
}
return v // ch 缓冲区中有内容, 直接取出返回
default: // ch 缓冲区中无内容,select 走 default 分支防止阻塞
}
select { // 阻塞等待
case v, ok := <- ch:
if !ok { // ch 已经被关闭, 直接返回
return io.EOF
}
return v // ch 缓冲区中有内容, 直接取出返回
case <- ctx.Done(): // 等待超时/取消
return ctx.Err()
}
在阻塞等待之前加入快速路径主要是因为 golang 中的 select 具有随机性, 在高并发场景下可能会出现 <- ctx.Done() 分支永不被进入。 详细的源码解析见下文
select 语句源码
#filepath: src/runtime/select.go
// case 对应的结构体
type scase struct {
// chan -> case 关联的 channel
c *hchan
// data element
// 发送: 待发送数据的源地址 (case ch <- v elem 指向 v 源地址)
// 接收: 目标变量的地址(堆、栈)(case v := <- ch elem 指向 v 的地址)
elem unsafe.Pointer
}
#filepath: src/runtime/select.go
// select 语句经编译器编译后被转化为 selectgo 的函数调用
func selectgo(
cas0 *scase, // scase 数组的首地址,存放在 goroutine 栈上
order0 *uint16, // [2*ncases]uint16 数组首地址,也分配在栈上
// 为什么有了 cas0 还需要 order0? 详见下文
pc0 *uintptr, // (race detector 构建) [ncases]uintptr 数组首地址;否则 nil;与本章节无关
nsends int, // 发送 case 的数量
nrecvs int, // 接收 case 的数量
block bool, // true=阻塞(无 default),false=非阻塞(有 default)
) (int, bool) // 返回值:选中的 case 索引 + 是否成功接收到值
// NOTE: In order to maintain a lean stack size, the number of scases
// is capped at 65536.
// 将裸指针转换为具有最大合法边界的数组
// 防止后续写入栈外内存
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases] // array[:end:max] 限制新切片最大容量为 ncases
pollorder := order1[:ncases:ncases] // 轮询顺序 注意:轮询顺序是随机的,并非顺序轮询,详见下文
lockorder := order1[ncases:][:ncases:ncases] // channel 加锁顺序
selecctgo 如何决定轮询顺序?
#filepath: src/runtime/select.go
j := cheaprandn(uint32(norder + 1)) // 随机数
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
selectgo 如何上锁?
#filepath: src/runtime/select.go
// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
// 根据 Hchan 的地址来获取上锁顺序
// 使用简单的堆排序来保证 nlog(n) 的时间复杂度和恒定的栈足迹(防止栈空间暴涨, 堆排序无需递归调用)
// 建堆
for i := range lockorder {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
// 排序
for i := len(lockorder) - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
// 将所有涉及到的channel上锁
sellock(scases, lockorder)
为什么已经有了 cas0, 还需要两倍 ncases 的 order0?
- 如果只传入 cas0,排序、建堆、上锁等操作需要频繁拷贝大对象(scase 16 bytes), 而 order0 只占用 2 bytes
- 轮询与上锁顺序不一样,对应在内存中的排列也不一样。
有无 default 分支的区别
#filepath: src/runtime/select.go
// 有 default 分支, block = false
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}
retc:
if caseReleaseTime > 0 {
blockevent(caseReleaseTime-t0, 1)
}
return casi, recvOK // 直接退出, 由调用方执行 default 分支