Skip to content

Latest commit

 

History

History
553 lines (375 loc) · 24.3 KB

5-2.调度-MPG.md

File metadata and controls

553 lines (375 loc) · 24.3 KB

简介

GMP 结构是 Go 中用于调度的三个结构体。

调度时,从 A任务切换到 B任务,需要保存 A任务的上下文,包括寄存器、栈指针、函数执行进度(程序计数器)等,G 就是用来保存上下文的载体

M 对应操作系统的线程,所有的 G 放到一个队列里,线程从这个队列里取任务运行。多个线程并发读取队列时,需要加全局锁,有比较严重的性能问题。

后来加上了P,把全局G队列打散成每个P里一个本地G队列,解决了的锁的问题。调度时:依次从P本地队列、全局队列、网络读写事件、从别的P偷窃G 调度运行。

调度

所谓并发,是指 CPU 一会儿处理几下 A 程序的代码,一会儿又处理几下 B 的,看起来就好像一秒内同时在处理 A 和 B 程序。

而公平的分配 A 和 B 的运行时间,就是 调度 所做的事情。

Go 调度器的发展历程

  • 单线程调度器 0.x
    • 程序中只能存在一个活跃线程M,由 G-M 模型组成
  • 多线程调度器 1.0
    • 有多个M,允许运行多线程程序
    • 全局锁导致竞争严重
  • 任务窃取调度器 1.1
    • 在当前的 G-M 模型中引入了中间层 P,将原来塞在一个全局队列里的G 打散到多个 P 中
    • 在 P 的基础上实现基于工作窃取的调度器
    • 在某些情况下,Goroutine 不会让出线程,导致饥饿问题
    • 时间过长的垃圾回收 (Stop the world, STW) 会导致程序长时间无法工作
  • 抢占式调度器 1.2 ~ 1.14
    • 抢占 = 当前协程休眠,让出 CPU 给别的协程
    • 基于协作的抢占式调度器 1.2 ~ 1.13
      • 通过编译器,在编译时向函数头部插入抢占检查代码,在函数调用时判断是否触发抢占,让出当前 CPU
      • 有一些无法被抢占的边缘情况:例如空的 for 循环、垃圾回收长时间占用线程等情景
      • 需要函数调用作为入口才能触发抢占,所以这是一种协作式的抢占式调度
    • 基于信号的抢占式调度器 1.14
      • 程序启动时,注册对 SIGURG 信号的处理函数
      • 垃圾回收在扫描栈时会触发抢占,向线程发送信号 SIGURG
      • 处理函数收到抢占信号后,获取当前协程,让其休眠并让出线程
      • 抢占的时间点不够多,还不能覆盖全部的边缘情况

初代调度模型 MG

G

调度就是 CPU 暂停当前的工作,转而执行另一个工作。

为了实现调度,需要引入一个数据结构保存 CPU寄存器的值以及 goroutine 的其它一些状态信息,这个载体就是 G

G 被调离 CPU 时,调度器代码负责把 CPU 寄存器的值保存在 G 对象的成员变量之中。当 G 被调度起来运行时,调度器代码又负责把 G 对象的成员变量所保存的寄存器的值恢复到 CPU 的寄存器。

M

G 只保存状态信息,不负责执行任务。真正执行任务的是 M 结构体,表示系统线程。它本身与一个内核线程进行绑定,是实际的执行体。

一开始的调度模型只有 GM 两个结构。所有的 G 都放在一个全局队列 global runqueue 里,M 从这个队列里取 G 并执行 G 里保存的任务。

由于多个 M 都会从这个全局队列里获取 G 来运行,所以必须对该队列加锁保证互斥。这必然导致多个 M 激烈的锁的竞争,这也是老调度器最大的一个缺点。

具体来说,老调度器有4个缺点:详见 Scalable Go Scheduler Design Doc

  1. 创建、销毁、调度 G 都需要每个 M 获取锁,这就形成了激烈的锁竞争
  2. M 转移 G 会造成延迟和额外的系统负载。比如在 G 中创建新协程的时候,M 创建了 G2,为了继续执行G,需要把 G2 交给 M2 执行,也造成了很差的局部性,因为 G2G 是相关的,最近访问的数据都在 M 的缓存里,最好放在 M 上执行,而不是其他 M2
  3. M 中的 mcache 是用来存放小对象的,mcache 、栈都和 M 关联造成了大量的内存开销和很差的局部性
  4. 系统调用导致频繁的线程阻塞和取消阻塞操作增加了系统开销。

次代调度模型 MPG

在 Dmitry Vyokov 实现的次代调度模型里,加上了 P 这一层,将全局唯一的 G 队列打散到了 P 里,每个 P 自己维护一个 G 的局部队列,解决原来的全局锁问题。

除了上面三个结构体以外,还有一个存放所有可运行 goroutine 的容器 schedt。每个 Go 程序中 schedt 结构体只有一个实例对象,在代码中是一个共享的全局变量,每个工作线程都可以访问它以及它所拥有的 goroutine 运行队列。

另外,尽管 P/M 构成执行组合体,但两者数量并非一一对应。 P 的数量默认为 CPU 数,而 M 是由调度器按需创建的。比如,当 M 因陷入系统调用阻塞时,P 就会被监控线程抢回,去新建/唤醒一个 M 执行其他任务,这样 M 的数量就会增长。

详情

G (goroutine)

// runtime/runtime2.go
type g struct {
    // 栈相关
    stack        stack      // 当前协程的栈内存范围 [stack.lo, stack.hi) 
    stackguard0  uintptr    // 一个警戒指针,用来判断栈容量是否需要扩张
  
    m            *m         // 当前 Goroutine 占用的线程
    sched        gobuf      // 调度相关的数据,上下文切换时就更新这个
  
    // 抢占相关
    preempt      bool       // 抢占信号,标记 G 是否应该停下来被调度,让给别的 G
  
    timer        *timer     // 给 time.Sleep 用的 timer

    _panic       *_panic    // panic链表
    _defer       *_defer    // defer链表
}

// sched 字段为 gobuf 类型,在调度器保存/恢复上下文的时候会用到。其中的栈指针和程序计数器会用来存储/恢复寄存器中的值,改变程序即将执行的代码。
type gobuf struct {
    sp   uintptr      // 栈指针 Stack Pointer
    pc   uintptr      // 程序计数器 Program Counter
    g    guintptr     // 所属的 G
    ret  sys.Uintreg  // 系统调用的返回值
}
  • G 维护了 goroutine 需要的栈、程序计数器以及它所在的 M 等信息。
  • G 并非执行体,仅保存并发任务状态,为任务提供所需要的栈空间
  • G 创建成功后放在 P 的本地队列或者全局队列,等待被调度
  • G 使用完毕后会被放回缓存池里,等待被复用
    • 因此存在:如果瞬发新建了大量 G,高峰后这些 G 会一直留着,不释放内存的问题。
    • 两级缓存池:每个 P 里有本地 G 缓存,schedt 里还有个全局 G 缓存,共两级
    • P 里的缓存池是 gFree 字段, G 工作结束后(状态变为 Gdead),会放入这里。读写它不需要锁
    • P.gFree 里的空闲 G 超过 64 个后,会分一半的 G 到全局缓存 schedt.gFree 里。schedt.gFree 又根据 G 的栈大小分成了两个队列
      • 为节省空间,G 的栈如果超过 2KB,那回收时不会保留它的栈,放入 schedt.gFree.noStack
      • 否则放入另一个队列 schedt.gFree.Stack
sudog

G 遇到阻塞场景时,比如向 channel 发送/接收阻塞时,会被封装为 sudog 这样一个结构。其实就是给 G 加了个 elem 字段,用于记录 channel 发送的数据。

type sudog struct {
    g     *g               // 正常的 G
    elem  unsafe.Pointer   // 保存数据的指针
}

M (Machine) 线程

M 表示操作系统线程。调度器最多可以创建 10000 个线程,但是其中大多数的线程都不会执行用户代码(可能陷入系统调用),最多只会有 GOMAXPROCS 个活跃线程能够正常运行。

在默认情况下,运行时会将 GOMAXPROCS 设置成当前机器的核数,我们也可以在程序中使用 runtime.GOMAXPROCS 来改变最大的活跃线程数。

type m struct {
    g0         *g     // 预留用于执行调度指令的 G。调度和执行 syscall 时会切换到这个 G。通过它, M 可以不通过 P 执行原生代码

    curg              // 当前运行的 G
    preemptoff string // 是否保持 curg 始终在这个 M 上运行
  
    p             puintptr // 当前挂的 P 
    nextp         puintptr // 下一个 P
    oldp          puintptr // 上一个运行的 P
  
    locks      int32  // 表示该 M 是否被锁,M 被锁的状态下该 M 无法执行 GC
    spinning   bool   // 是否自旋,自旋就表示 M 正在找 G 来运行。自旋状态的 M = 正在找工作 = 空闲的 M
    blocked    bool   // M 是否被阻塞
    ...
}
  • 一个 M 就是一个用户级线程
  • M 是一个很大的结构,里面维护小对象内存 cache、当前执行的 goroutine、随机数发生器等等非常多的信息。
  • M 通过修改寄存器,将执行栈指向 G 的栈内存,并在此空间内分配堆栈帧,执行任务函数
  • 中途切换时,将寄存器值保存回 Gsched 字段即可记录状态,任何 M 都可以通过读这个 G 的该字段来恢复该 G 的状态。线程 M 本身只负责执行,不保存状态。这是并发任务跨线程调度,实现多路复用的根本所在。
  • M 还有个 g0 字段,在调度和执行系统调用时,使用这个 g0,这样就不需要处理用户线程的栈相关的逻辑。这个 g064KB 的栈,也比普通协程 2KB 的栈大得多

M 并没有像 GP 一样的状态标记,但可以认为一个 M 有以下的状态:

  1. 自旋中 spinningM 正在从运行队列获取G,这时候 M 会拥有一个 P
  2. 执行 go 代码中: M 正在执行 go 代码, 这时候 M 会拥有一个 P
  3. 执行原生代码中: M 正在执行阻塞的 syscall 或者 cgo这时 M 并不拥有 PM 可以无 P 执行原生代码,运行在 g0 上)
  4. 休眠中: M 发现没有待运行的 G 时会进入休眠, 并添加到空闲 M 链表中, 这时 M 并不拥有 P

自旋中 spinning 这个状态非常重要,是否需要唤醒或者创建新的 M 取决于当前自旋中的 M 的数量。

通常创建一个 M 的原因是由于没有足够的 M 来关联 P 并运行其队列里的 G。此外运行时 sysmon 、执行阻塞性系统调用、或者 GC 的时候也会创建 M

P (Processor),处理器

P 是线程 MG 的中间层,用于调度 GM 上执行。

// runtime.runtime2.go
type p struct {
    m           muintptr    // 挂载的 M
     
    mcache      *mcache     // 用于内存分配的结构体
  
    runq     [256]guintptr  // 本地可运行的 G 队列,local runqueue。长度固定256。通过队尾-队头判断实际长度
    runqhead uint32         // 队头
    runqtail uint32         // 队尾
    runnext guintptr        // 下一个可执行 G,优先级比 runq 队列高
    
    gFree struct {          // Gdead 状态的 G 池。新建 G 时,优先从这里取进行复用,其次再用 schedt 里的全局队列
        gList
        n int32
    }

    sudogcache []*sudog     // sudog 缓存池。二级缓存,优先从这里复用,再用 schedt 里的
    sudogbuf   [128]*sudog
}

var (
    allp []*p               // 全局 P 数组
)
  • P 自身是用一个全局数组 allp 来保存的,长度默认为 GOMAXPROCS(一般设为 CPU 核数)
  • 它的主要用途就是用来执行 goroutine 的,所以它维护了一个本地 G 队列,里面存储了所有需要它来执行的 goroutine
    • 本地队列避免了竞争锁(解决了初代模型最大的问题)。
  • P 为线程提供了执行资源,如对象分配内存,本地 G 队列等。线程独享 P 资源,可以无锁操作
  • 为何要维护多个 P 呢?是为了当一个 OS线程 M 陷入阻塞时,P 能从之前的 M 上脱离,转而在另一个 M 上运行
  • P 里设计了两级 G ,一个 runnext 存储下一个要运行的 G,和一个 runq 存储待运行的 G 列表

Schedt

  • Schedt 结构就是一个调度器,它维护有存储 MG 的队列以及调度器的一些状态信息等。
  • 它是一个共享的全局变量,全局只有一个 schedt 类型的实例。
  • 里面存放了空闲(即休眠) M 列表、空闲 P 列表、全局 G 队列、废弃的 G 队列
// runtime/runtime2.go
type schedt struct {
    
    lock mutex 
     
    midle        muintptr  // 空闲的 M 列表
    nmidle       int32     // 空闲的 M 列表数量
    mnext        int64     // 下一个被创建的 M 的 id
    maxmcount    int32     // M 的最大数量,初始化时设为 10000,即最多 10000 个 M
    nmspinning uint32      // 处于自旋状态的 M 的数量

    pidle      puintptr    // 空闲 P 列表
    npidle     uint32      // 空闲 P 数量
    
    runq     gQueue        // 全局 runnable G 队列
    runqsize int32         // 全局 G 队列的长度
   
    gFree struct {         // 有效 dead G 的全局缓存。二级设计,新建 goroutine 时,优先从 P 复用,再从这里
        lock    mutex
        stack   gList      // 包含栈的
        noStack gList      // 没有栈的
        n       int32
    }
    
    sudoglock  mutex      
    sudogcache *sudog      // 全局 sudog 缓存。也是个二级缓存,使用时先用 P 的,再用全局的
     
    deferlock mutex
    deferpool [5]*_defer   // defer 结构的池
}

重要的全局变量

allgs     []*g   // 保存所有的g
allm       *m    // 所有的m构成的一个链表,包括下面的m0
allp       []*p  // 保存所有的p,len(allp) == GOMAXPROCS

ncpu             int32 // 系统中 CPU 核数,程序启动时由 runtime代码初始化
gomaxprocs int32       // P数量的最大值,默认等于 CPU 数,可以通过 GOMAXPROCS 系统变量修改

sched      schedt // 调度器结构体对象,记录了调度器的工作状态

m0  m  // 代表进程的主线程
g0   g // m0 的 g0

实现

创建 G

  • 使用 go func() 关键字时,编译器会通过 cmd/compile/internal/gc.state.stmtcmd/compile/internal/gc.state.call 两个方法将该关键字转换成 runtime.newproc 函数调用,创建新的 goroutine
    • 也不一定是从新创建,会先尝试复用空闲的 G
    • 先从 Pgfree 字段取空闲 G,没有再从 sched.gfree 链表里转移一批空闲 GP 里,再重试
    • 再没有,才会新建
  • 新的 goroutine 会被加到本地队列里
    • 新建的协程会被放到本地队列的头部(runnext 字段)
    • 调度时,会从队列里 pop 一个 goroutine,设置栈和 instruction pointer,开始执行这个 goroutine
  • P 的本地队列长度超过 256 时,里面一半的 G 会被转移至全局队列
  • 任务队列分为三级,分别是 P.runnext, P.runq, Sched.runq,可以视为三级协程池,很有些 CPU 多级缓存的意思。
// 创建新的 G
func newproc(siz int32, fn *funcval) { // siz: go func() 里这个 func 的参数大小;fn: go func() 里的这个 func
    gp := getg()        // 获取当前的 G 
    pc := getcallerpc() // 获取调用者的程序计数器 PC
  
    systemstack(func() {                         // 系统调用
        newg := newproc1(fn, argp, siz, gp, pc)  // 先找空闲的 G,如果没有则创建新的 G 结构体
        _p_ := getg().m.p.ptr()   
        runqput(_p_, newg, true)                 // 将 G 加入到 P 的运行队列
      
        if mainStarted {
            wakep()                              // 这里还会触发一次唤醒空闲的 M 执行空闲的 P
        }
    })
}

newproc1 函数会先找空闲的 G,如果没有则创建新的 G 结构体,并拷贝 fn 函数的参数到 G 的栈上

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
    newg := gfget(_p_)         // 先找空闲的 G
    if newg == nil {           // 如果没找着,则创建一个新的 G
        // _StackMin = 2KB
        newg = malg(_StackMin) // malg 函数申请 2KB 的栈空间并创建 G
    }
  
    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
	  totalSize += -totalSize & (sys.SpAlign - 1)
	  sp := newg.stack.hi - totalSize                         // sp 为 G 的栈指针,指向存储函数参数的起始地址
	  spArg := sp
	  if narg > 0 {
		    memmove(unsafe.Pointer(spArg), argp, uintptr(narg)) // 将 fn 函数的所有参数拷贝到 G 的栈上
	  }
  
    return newg
}

gfget 函数会尝试从 P 的本地空闲队列里找 G,如果本地队列为空而全局空闲队列 sched.gFree 不为空,则从全局队列里取32个空闲 G 挪到 P 的本地队列里再取出

func gfget(_p_ *p) *g {
retry:
    if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) { // 本地队列为空而全局队列不为空
        for _p_.gFree.n < 32 {
            gp := sched.gFree.stack.pop()      // 先从全局队列的 stack 字段取 空闲G
            if gp == nil {
                gp = sched.gFree.noStack.pop() // 在从 noStack 字段取 空闲G
                if gp == nil {
                    break
                }
            }
            _p_.gFree.push(gp) // 放到本地队列里
        }
        goto retry
    }
    gp := _p_.gFree.pop()
    if gp == nil {
        return nil
    }
    return gp
}

如果本地和全局空闲队列里都没有空闲 G,则调用 malg() 函数创建一个具有 2KB 栈空间的新 G

runqput 函数会尝试将新建的 G 写入 P 本地队列的 runnext 字段。本地队列如果满了,放入全局队列

可以看到只向本地队列写入时,逻辑是无锁的,提升效率。

// runtime/proc.go
// runqput 尝试将一个 G 放入本地队列. 本地队列如果满了,则移动队列里的一半 G 到全局队列
// next = false 时,放入队尾
// next = true 时,直接写入 P 的 runnext 字段。(新建 G 时 next 就为 true)
// 只能由 owner P 执行
func runqput(_p_ *p, gp *g, next bool) {
    if next {                                                  // 新建 G 时,next 为 true,G 会直接被写入 runnext 字段
        oldnext := _p_.runnext
        _p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) // 直接写入 runnext 字段
        gp = oldnext.ptr()                                     // 原 runnext 里的 G 后面会被踢到本地/全局队列
    }
    
    // 虽然 runq 是循环队列,但 h 和 t 的值并不会 %256,t 会始终自增,因此可以通过 t-h 判断队列长度。赋值时才取模
    h := atomic.LoadAcq(&_p_.runqhead)            // 队头
    t := _p_.runqtail                             // 队尾
    if t-h < uint32(len(_p_.runq)) {              // 本地队列未满,可写入
        _p_.runq[t%uint32(len(_p_.runq))].set(gp) // 插到队列尾部
        atomic.StoreRel(&_p_.runqtail, t+1)       // 更新队尾指针
        return
    }
    if runqputslow(_p_, gp, h, t) { // 如果本地队列满了,分一半到全局队列。因为需要加锁,所以函数叫 slow
        return
    }
}

// 移动本地队列的前半部分,到全局队列。因为全局队列就需要加锁了,所以函数叫 slow
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    var batch [len(_p_.runq)/2 + 1]*g

    n := t - h
    n = n / 2
    for i := uint32(0); i < n; i++ {
        batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() // 把本地队列的前半部分到 batch 数组
    }
    if !atomic.CasRel(&_p_.runqhead, h, h+n) {                 // 更新本地队列的头指针,即删除本地队列的前半部分
        return false
    }
    
    // 把 batch 数组做成链表,这样后面放入全局队列时只需要把 batch 头结点接到全局队列尾节点后面就可以了
    for i := uint32(0); i < n; i++ {
        batch[i].schedlink.set(batch[i+1]) 
    }
    
    var q gQueue
    q.head.set(batch[0]) 
    q.tail.set(batch[n])
    lock(&sched.lock)                // 操作全局队列,要加锁了
    globrunqputbatch(&q, int32(n+1)) // 把 batch 数组放到全局队列尾部
    unlock(&sched.lock)
}

创建P

P 只在初始化的时候创建,数量为 GOMAXPROCS,一般建完就不会再变了,除非用户调用 runtime.MAXGOPROCS 调整 P 的数量。

不过官方注释说 runtime.MAXGOPROCS 在后续改进调度器后会被移除。

创建 M

newproc 里可以看到一个 wakep() 函数,里面调用了 startm 函数找一个空闲 M 或新建 M 执行空闲 P。如果没有空闲 M,就会调用 newm 函数新建一个 M

newm() 在 linux 平台底层调用的是 clone() ,传入的函数是 mstart(),而 mstart() 里会递归调用 schedule(),不会停下来。因此 M 一旦被创建,就不再会被销毁了,最多是通过 stopm() 休眠,放入 sched 的空闲 M 列表里。

// runtime/proc.go
// 唤醒一个空闲 M 执行空闲 P
func wakep() {
    if atomic.Load(&sched.npidle) == 0 {                                            // 如果没有空闲P就返回
        return
    }
    if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) { // 设为自旋状态,防止并发唤醒
        return
    }
    startm(nil, true) // 找空闲 M,或者新建一个 M
}

// 获取空闲 M 或新建 M
func startm(_p_ *p, spinning bool) {
    nmp := mget()          // 获取空闲 M 
    if nmp == nil {        // 没有空闲 M,新建一个
        id := mReserveID() // sched.mnext 字段记录了 M 的自增 ID;如果超出 sched.maxmcount(默认10000),会 panic
        newm(fn, _p_, id)  // 新建 M
        return
    }
}

// 新建一个M
func newm(fn func(), _p_ *p, id int64) {
    mp := allocm(_p_, fn, id) // 创建 M 对象
    newm1(mp)
}

// 创建M对象、分配空间、初始化
func allocm(_p_ *p, fn func(), id int64) *m {
    mp := new(m)        // 创建M对象 
    mp.mstartfn = fn    // 设置启动函数
    mcommoninit(mp, id) // 初始化
  
    mp.g0 = malg(8192 * sys.StackGuardMultiplier) // 初始化g0,栈为8KB
    mp.g0.m = mp
}

// 为M创建系统线程
func newm1(mp *m) {
    newosproc(mp) // 创建系统线程
}

// runtime.os_linux.go
// 这个函数根据具体 os 的实现是不一样的,linux 下用的是 clone(),macOS 下用的就是 pthread_create()
// 初始 func 是 mstart,mstart() 会触发调度 schedule()
func newosproc(mp *m) {
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
}

M 比较特别的地方是自带一个名为 g0,默认 8KB 栈内存的 G 属性对象。它的栈内存地址被传给 clone 函数,作为系统线程的默认堆栈空间(Linux下)。

通过 g0M 可以无 P 执行 runtime 管理指令,比如 systemstack 这种执行方式。

在进程执行过程中,有两类代码需要运行。

  • 一类是用户逻辑,直接使用当前 G 的栈内存
  • 另一类是运行时管理指令,它并不便于直接在用户栈上执行,因为涉及到切换,需要保存用户栈、执行自己的逻辑、再恢复用户栈,要处理与用户逻辑现场有关的一大堆事务,很麻烦。

因此,当需要执行管理指令时,会将线程栈临时切换到 g0,与用户逻辑彻底隔离。

问题

P 的 runq大小为什么是 256?

太小会频繁触发和全局队列的交换。太大会频繁触发偷窃。

参考

Golang调度与MPG

深入理解Go语言(03):scheduler调度器 - 基本介绍

golang 调度学习-综述

golang核心原理-协程调度时机

lucifer_L49715 - 理解golang调度之二 :Go调度器