单核CPU下Go语言调度及抢占式调度的实现

有群友说面试的时候被问到:单核CPU,开两个goroutine,其中一个死循环,会怎么样?答案是:死循环的goroutine block住了但是完全不影响另一个goroutine和主goroutine的运行。相信很多小伙伴乍一看一脸懵,我就在群里回了一下go1.14版本实现了基于信号的抢占式调度,可以在goroutine执行时间过长的时候强制让出调度,执行其他的goroutine。接下来看看具体怎么实现的,话不多说直接上代码。基于go1.15 linux amd64。
先看一个常规的栗子

func f1() {
    fmt.Println("This is f1")
}

func f2() {
    fmt.Println("This is f2")
}

func main() {
    runtime.GOMAXPROCS(1)

    go f1()
    go f2()

    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}
// always print f2 f1 

以上代码就是模拟单核CPU下go语言的执行情况,无论运行多少次,输出的结果总是 f2 f1,解释如下

func main() {
    // 只有一个 P 了,目前运行主 goroutine
    runtime.GOMAXPROCS(1)

    // 创建一个 G1 , call $runtime.newproc -> runqput(_p_, newg, true)放入到本地队列
    // If next is false, runqput adds g to the tail of the runnable queue. 放在本地队列尾部
    // If next is true, runqput puts g in the _p_.runnext slot. 放在 runnext 上
    go f1()

    // 因为只有一个 P 主 goroutine继续运行

    // 创建一个 G2 , 同上面,也是加入到 runnext,会替换掉之前的 runnext 
    //这时候 runnext 保存的是 G2 本地队列的顺序就是  G1
    // runnext 优先级最高,所以执行顺序是 G2 G1
    go f2()

    // 等待 f1 f2的执行 gopark 主 goroutine GMP调度可运行的 G
    // 按顺序调用 G2 G1
    // 所以不管执行多少次,结果都是
    // This is f2
    // This is f1
    // success
    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}

如果将runtime.GOMAXPROCS(1)改成runtime.GOMAXPROCS(4)即多核CPU,你就会发现 f1 和 f2 交替执行,没有明确的先后,这种事件A和B完全无序执行,即为并发。利用多核CPU同时执行A和B的情况,即为并行。为什么会这样涉及到go语言的GMP调度模型,这里不赘述,有兴趣的小伙伴可以自行学习。

既然每次都是 f2 先执行,那在 f2 中加入一个死循环会怎么样呢?

func f1() {
    fmt.Println("This is f1")
}

func f2() {
    // 死循环
    for {

    }
    fmt.Println("This is f2")
}

func main() {
    runtime.GOMAXPROCS(1)

    go f1()
    go f2()

    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}

// This is f1
// success

你会发现虽然 f2 block住了没有输出,但是完全没影响f1和主goroutine的运行,这其实就是抢占式调度。golang在之前的版本中已经实现了抢占调度,但有些场景是无法抢占成功的。比如轮询计算 for { i++ } 等,这类操作无法进行newstack、morestack、syscall,无法检测stackguard0 = stackpreempt的场景。通俗点说之前版本实现的抢占调度发生在goroutine虽然执行了很长时间,但还得继续调用函数等操作,才能检查是不是需要抢占。
以下是src/runtime/stack.go/newstack()中抢占调度的内容:

func newstack() {
    ...
    // NOTE: stackguard0 may change underfoot, if another thread
    // is about to try to preempt gp. Read it just once and use that same
    // value now and below. 
    // 如果是发起的抢占请求而非真正的栈分段
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt

    // 如果正持有锁、分配内存或抢占被禁用,则不发生抢占
    if preempt {
        if !canPreemptM(thisg.m) {
            // Let the goroutine keep running for now. 不发生抢占,继续调度
            // gp->preempt is set, so it will be preempted next time.
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return 重新进入调度循环
        }
    }
        ...
    // 如果需要抢占
    if preempt {
        if gp == thisg.m.g0 {
            throw("runtime: preempt g0")
        }
        if thisg.m.p == 0 && thisg.m.locks == 0 {
            throw("runtime: g is running but p is not")
        }

        if gp.preemptShrink {
            // We're at a synchronous safe point now, so
            // do the pending stack shrink. 
            gp.preemptShrink = false
            shrinkstack(gp)
        }

        if gp.preemptStop {
            preemptPark(gp)    // never returns 进入循环调度
        }

        // Act like goroutine called runtime.Gosched. 表现得像是调用了 runtime.Gosched,主动让权,进入循环调度
        gopreempt_m(gp) // never return
    }
        ...
}

显然上面的抢占调度还是存在一些问题的,GO团队在go1.14版本中实现了基于信号协程调度抢占。下面看下如何实现的。
信号的发送方:垃圾回收 STW 和长时间运行的 goruntine 需要抢占,关键在于方法preemptone(p)。

func stopTheWorldWithSema() {
    ...
    // 发送抢占信号
    preemptall()
    ...
}
func preemptall() bool {
    res := false
    for _, _p_ := range allp {
        if _p_.status != _Prunning {
            continue
        }
        if preemptone(_p_) {
            res = true
        }
    }
    return res
}

Go Runtime 在启动程序的时候,会创建一个独立的 M 作为监控线程,称为 sysmon,它是一个系统级的 daemon 线程。这个sysmon 独立于 GPM 之外,也就是说不需要P就可以运行,也是作为抢占信号的发送方一直运行。
src/runtime/proc.go/main()

// The main goroutine.
func main() {
    ...
    // 启动系统后台监控(定期垃圾回收、并发任务调度)
    if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
        systemstack(func() {
            // 系统监控在一个独立的 m 上运行
            newm(sysmon, nil, -1)
        })
    }
...
}

src/runtime/proc.go/sysmon(),重点是里面的retake()

func sysmon() {
    lock(&sched.lock)
    // 不计入死锁的系统 m 的数量
    sched.nmsys++
    // 死锁检查
    checkdead()
    unlock(&sched.lock)

    lasttrace := int64(0)
    idle := 0 // how many cycles in succession we had not wokeup somebody 没有 wokeup 的周期数
    delay := uint32(0)
    //死循环一直执行
    for {
        if idle == 0 { // start with 20us sleep... 每次启动先休眠 20us
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms... 1ms 后就翻倍休眠时间
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms 增加到 10ms
            delay = 10 * 1000
        }
        // 休眠
        usleep(delay)
        now := nanotime()
        // timer定时器检查
        next, _ := timeSleepUntil()
        ...
        // retake P's blocked in syscalls
        // and preempt long running G's 抢夺在 syscall 中阻塞的 P、运行时间过长的 G
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
    ...
    }
}

src/runtime/proc.go/retake()

func retake(now int64) uint32 {
    n := 0
    // Prevent allp slice changes. This lock will be completely
    // uncontended unless we're already stopping the world. 防止 allp 数组发生变化,除非我们已经 STW,此锁将完全没有人竞争
    lock(&allpLock)
    // We can't use a range loop over allp because we may
    // temporarily drop the allpLock. Hence, we need to re-fetch
    // allp each time around the loop.
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long. 如果 G 运行时时间太长则进行抢占
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {
               // 运行超过10ms,就在这里了
                preemptone(_p_)
                // In case of syscall, preemptone() doesn't 对于 syscall 的情况,因为 M 没有与 P 绑定,
                // work, because there is no M wired to P. preemptone() 不工作
                sysretake = true
            }
        }

        // 对阻塞在系统调用上的 P 进行抢占
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            // 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 P
            t := int64(_p_.syscalltick)
            if !sysretake && int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,一方面,在没有其他 work 的情况下,我们不希望抢夺 P
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.另一方面,因为它可能阻止 sysmon 线程从深度睡眠中唤醒,所以最终我们仍希望抢夺 P
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // Drop allpLock so we can take sched.lock. 解除 allpLock,从而可以获取 sched.lock
            unlock(&allpLock)
            // Need to decrement number of idle locked M's 在 CAS 之前需要减少空闲 M 的数量(假装某个还在运行)
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock. 否则发生抢夺的 M 可能退出 syscall 然后再增加 nmidle ,进而发生死锁
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
               // 转移 P 有其他任务就创建 M 去执行 一圈找下来都没有就放入空闲的 P 列表
                handoffp(_p_)
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

preemptone -> preemptM(mp)->signalM(mp, sigPreempt)直至发起系统调用,完成抢占信号的发送。

信号的接收方:
Go 运行时初始化是会调用runtime.mstart完成信号处理的初始化行为,调用mstart1()
src/runtime/proc.go/mstart1()

func mstart1() {
    ...

    // Install signal handlers; after minit so that minit can
    // prepare the thread to be able to handle the signals.
    // 设置信号 handler;在 minit 之后,以便 minit 可以准备处理信号的的线程
    if _g_.m == &m0 {
        // 只在当前 m 是 m0 的时候执行, mstartm0主要就是初始化信号处理 initsig
        mstartm0()
    }
     ...
}

src/runtime/proc.go/mstartm0()

func mstartm0() {
    ...
    // 信号处理初始化
    initsig(false)
}

src/runtime/signal_unix.go/initsig()

func initsig(preinit bool) {
    ...
        // 对于一个需要设置 sighandler 的信号,会通过 setsig 来设置信号对应的动作(action):
        setsig(i, funcPC(sighandler))
     ...
}

src/runtime/signal_unix.go/sighandler()

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    ...

    if sig == sigPreempt && debug.asyncpreemptoff == 0 {
        // Might be a preemption signal. 可能是一个抢占信号
        doSigPreempt(gp, c)
        // Even if this was definitely a preemption signal, it
        // may have been coalesced with another signal, so we 即便这是一个抢占信号,它也可能与其他信号进行混合,因此我们
        // still let it through to the application. 继续进行处理。
    }
}

doSigPreempt->asyncPreempt->asyncPreempt2

func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    if gp.preemptStop {
        mcall(preemptPark)
    } else {
        mcall(gopreempt_m)
    }
    // 异步抢占过程结束
    gp.asyncSafePoint = false
}

不管是preemptPark还是gopreempt_m,最终都是进入调度循环schedule(),去执行其他的 G。
终于写完了,撒花,菜鸟一枚,有什么不对的欢迎评论留言指正,谢谢。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 230,002评论 6 542
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,400评论 3 429
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 178,136评论 0 383
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,714评论 1 317
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,452评论 6 412
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,818评论 1 328
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,812评论 3 446
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,997评论 0 290
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,552评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,292评论 3 358
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,510评论 1 374
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 39,035评论 5 363
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,721评论 3 348
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 35,121评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,429评论 1 294
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 52,235评论 3 398
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,480评论 2 379

推荐阅读更多精彩内容