- 并发安全的传递上下文,如传递 trace_id、kv 结构等
- 树状并发情景中的协程控制(某层任务取消后,及时停止所有的下层任务,上层不受影响;减少额外资源的消耗,减少处理时长)
- 超时控制
func main() {
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) // 1s的超时
defer cancel1() // 保证返回前调用一次 cacel。这样比如主协程 panic 时,就可以调用到 cancel 函数,释放子协程资源
req1, _ := http.NewRequestWithContext(ctx1, http.MethodGet, "", nil)
go A(req1)
}
func A(r http.Request) {
... // 一些耗时的处理逻辑
B(r)
}
func B(r http.Request) {
select {
// B这里应当先检查是否超时。如果超时,则提现返回,减少链路整理时长
case <-ctx.Done():
return
default:
}
}
context.Context
是 Go 语言在 1.7 版本中引入标准库的接口,该接口定义了四个需要实现的方法,其中包括:
// 其实是个接口
type Context interface {
Deadline() (deadline time.Time, ok bool) // 返回 context 被取消的时间,也就是完成工作的截止日期
Done() <-chan struct{} // 被取消或者超时时候返回的一个close的channel,子函数需要对其监听。(像全局广播,因为是close的,所有协程都能收到)
Err() error // 返回被取消或超时的原因
Value(key interface{}) interface{} // 用于保存kv;树状结构:如果当前节点上无法找到key所对应的值,就会向上去父节点里找,直到根节点
}
// A canceler is a context type that can be canceled directly. The implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
为啥不将这里的 canceler
接口与 Context
接口合并,而是分成两个方法呢?他们定义的方法中都有 Done
方法。可以解释得通的说法是,源码作者认为 cancel
方法并不是 Context
必须的,根据最小接口设计原则,将两者分开。像 emptyCtx
和 valueCtx
不是可取消的,所以他们只要实现 Context
接口即可。
库里头提供了4个 Context
实现,来供大家玩耍
// 完全空的Context,用作树的根节点,其实是个int
type emptyCtx int
// 继承自Context,并实现了canceler接口
type cancelCtx struct {
Context
mu sync.Mutex
done chan struct{}
children map[canceler]struct{} // 记录可取消的孩子节点
err error
}
// 继承自cancelCtx,加了个timer,以支持超时处理
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
// 继承自Context,加了个kv
// 携带kv信息,为全链路提供线索,比如接入elk等系统,需要来一个trace_id,valueCtx就非常适合做这个事
type valueCtx struct {
Context
key, val interface{}
}
其实主要就做了一件事:close(c.done)
把 c.done
这个 channel
关闭掉,这样所有通过 select
方法监听 <- c.Done()
的协程都会收到通知
// closedchan is a reusable closed channel.
var closedchan = make(chan struct{})
func init() {
close(closedchan)
}
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
c.mu.Lock() // 加锁
c.err = err // 设置取消原因
if c.done == nil {
c.done = closedchan
} else {
close(c.done) // 主要逻辑:关闭 c.done 这个 channel
}
for child := range c.children { // 将子节点依次取消
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
}
// 新建一个cancel context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c) // 将新建的 c 加到其父节点的 child 列表里。当父上下文被取消时,子上下文也会被取消:
return &c, func() { c.cancel(true, Canceled) }
}
func propagateCancel(parent Context, child canceler) {
// parentCancelCtx这个函数会从parent开始向上找,直到找到一个可取消的context为止
// 即找到parent(含)最近的可取消祖先节点
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{} // 将child放到这个祖先节点的children列表里,以便祖先取消时能一次调用child的cancel方法
p.mu.Unlock()
} else {
// 如果没找到可用的祖先节点,就新起一个协程,监听父节点的Done事件
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// 用作查祖先节点时的key,其值为0没用,用的时候都是取址(见下面函数,把 &cancelCtxKey 这个地址当成 key 来用了
var cancelCtxKey int
// 找到parent(含)最近的可取消祖先节点
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
return p, true
}
// 根据key查找value,如果找不到则递归从父节点向上找
func (c *cancelCtx) Value(key interface{}) interface{} {
if key == &cancelCtxKey {
return c
}
return c.Context.Value(key) // 递归向上找
}
WithDeadline
内置了 timer
定时器,到期就执行 cancel
Withtimeout
实际上是 Withdeadline
的语法糖,使用 timeout
参数算了下 deadline
而已
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// 这里设了 timer 定时器,到期执行 c.cancel 方法
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
- 首先
Server
在开启服务时会创建一个valueCtx
, 存储了server
的相关信息,之后每建立一条连接就会开启一个协程,并携带此valueCtx
。
func (srv *Server) Serve(l net.Listener) error {
baseCtx := context.Background()
ctx := context.WithValue(baseCtx, ServerContextKey, srv) // ServerContextKey = "http-server"
for {
rw, e := l.Accept()
c := srv.newConn(rw)
go c.serve(ctx) // 开启新协程处理连接,并传递ctx
}
}
- 建立连接之后会基于传入的
context
创建一个valueCtx
用于存储本地地址信息,之后在此基础上又创建了一个cancelCtx
,然后开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx
传入,用以传递取消信号。一旦连接断开,即可发送取消信号,取消所有进行中的网络请求。
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr()) // 本地地址
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
for {
w, err := c.readRequest(ctx)
serverHandler{c.server}.ServeHTTP(w, w.req)
}
}
- 读取到请求之后,会再次基于传入的
context
创建新的cancelCtx
,并设置到当前请求对象req
上,同时生成的response
对象中cancelCtx
保存了当前context
取消的方法。
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
req, err := readRequest(c.bufr, keepHostHeader)
ctx, cancelCtx := context.WithCancel(ctx)
req.ctx = ctx
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body
}
return w, nil
}
在整个server
处理流程中,使用了一条 context
链贯穿Server
、Connection
、Request
,不仅将上游的信息共享给下游任务,同时实现了上游可发送取消信号取消所有下游任务,而下游任务自行取消不会影响上游任务。
另外有两点值得注意:
context
的取消是柔性的,上游任务仅仅使用context
通知下游不再需要,不会直接干涉和中断下游任务的执行,由下游任务自行处理取消逻辑,下游自己写select <- ctx.Done()
的逻辑。context
是线程安全的,其本身是不可变的(immutable),且并发情况下会加锁。