快速路径

运用 select 中的 default 分支可以实现无锁(非阻塞)的快速检查

select {
    case v, ok := <- ch:
        if !ok { // ch 已经被关闭, 直接返回
            return io.EOF
        }
        return v // ch 缓冲区中有内容, 直接取出返回
    default: // ch 缓冲区中无内容,select 走 default 分支防止阻塞
}
select { // 阻塞等待
    case v, ok := <- ch:
        if !ok { // ch 已经被关闭, 直接返回
            return io.EOF
        }
        return v // ch 缓冲区中有内容, 直接取出返回
    case <- ctx.Done(): // 等待超时/取消
        return ctx.Err()
}

在阻塞等待之前加入快速路径主要是因为 golang 中的 select 具有随机性, 在高并发场景下可能会出现 <- ctx.Done() 分支永不被进入。 详细的源码解析见下文

select 语句源码

#filepath: src/runtime/select.go

// case 对应的结构体
type scase struct {
    // chan -> case 关联的 channel
    c    *hchan
    // data element
    // 发送: 待发送数据的源地址 (case ch <- v elem 指向 v 源地址)
    // 接收: 目标变量的地址(堆、栈)(case v := <- ch elem 指向 v 的地址)
    elem unsafe.Pointer
}
#filepath: src/runtime/select.go

// select 语句经编译器编译后被转化为 selectgo 的函数调用
func selectgo(
    cas0   *scase,    // scase 数组的首地址,存放在 goroutine 栈上
    order0 *uint16,   // [2*ncases]uint16 数组首地址,也分配在栈上
    // 为什么有了 cas0 还需要 order0? 详见下文
    pc0    *uintptr,  // (race detector 构建) [ncases]uintptr 数组首地址;否则 nil;与本章节无关
    nsends int,       // 发送 case 的数量
    nrecvs int,       // 接收 case 的数量
    block  bool,      // true=阻塞(无 default),false=非阻塞(有 default)
) (int, bool)         // 返回值:选中的 case 索引 + 是否成功接收到值

// NOTE: In order to maintain a lean stack size, the number of scases
// is capped at 65536.
// 将裸指针转换为具有最大合法边界的数组
// 防止后续写入栈外内存
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases] // array[:end:max] 限制新切片最大容量为 ncases
pollorder := order1[:ncases:ncases] // 轮询顺序 注意:轮询顺序是随机的,并非顺序轮询,详见下文
lockorder := order1[ncases:][:ncases:ncases] // channel 加锁顺序

selecctgo 如何决定轮询顺序?

#filepath: src/runtime/select.go

j := cheaprandn(uint32(norder + 1)) // 随机数
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++

selectgo 如何上锁?

#filepath: src/runtime/select.go


// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
// 根据 Hchan 的地址来获取上锁顺序
// 使用简单的堆排序来保证 nlog(n) 的时间复杂度和恒定的栈足迹(防止栈空间暴涨, 堆排序无需递归调用)

// 建堆
for i := range lockorder {
    j := i
    // Start with the pollorder to permute cases on the same channel.
    c := scases[pollorder[i]].c
    for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
        k := (j - 1) / 2
        lockorder[j] = lockorder[k]
        j = k
    }
    lockorder[j] = pollorder[i]
}

// 排序
for i := len(lockorder) - 1; i >= 0; i-- {
    o := lockorder[i]
    c := scases[o].c
    lockorder[i] = lockorder[0]
    j := 0
    for {
        k := j*2 + 1
        if k >= i {
            break
        }
        if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
            k++
        }
        if c.sortkey() < scases[lockorder[k]].c.sortkey() {
            lockorder[j] = lockorder[k]
            j = k
            continue
        }
        break
    }
    lockorder[j] = o
}

// 将所有涉及到的channel上锁
sellock(scases, lockorder)

为什么已经有了 cas0, 还需要两倍 ncases 的 order0?

  1. 如果只传入 cas0,排序、建堆、上锁等操作需要频繁拷贝大对象(scase 16 bytes), 而 order0 只占用 2 bytes
  2. 轮询与上锁顺序不一样,对应在内存中的排列也不一样。

有无 default 分支的区别

#filepath: src/runtime/select.go

// 有 default 分支, block = false
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

retc:
	if caseReleaseTime > 0 {
		blockevent(caseReleaseTime-t0, 1)
	}
	return casi, recvOK // 直接退出, 由调用方执行 default 分支