goroutine调度
gorountine是一个用户态级别的类线程,所以它的堆栈也必然是在用户态的调度器中进行管理的
每个P中有一个G的队列(多个G)
M是一个实际线程,需要与P绑定才能运行,运行态M数量等于CPU线程数量
P是逻辑处理器,默认等于CPU线程数
G有阻塞代码,触发系统中断操作,而导致M被踢出CPU,
G 代表一个Goroutine;存储了goroutine的执行stack信息、goroutine状态以及goroutine的任务函数等;另外G对象是可以重用的。
M 代表一个操作系统的线程;M代表着真正的执行计算资源。在绑定有效的P后,进入schedule循环
P 代表一个逻辑CPU处理器(在golang中这个是一个对cpu的抽象),通过runtime.GOMAXPROCS (numLogicalProcessors)可以控制多少P,但是通常P的数量设置是等于CPU线程数(runtime.GOMAXPROCS()),不设置默认CPU线程数
P的数量决定了系统内最大可并行的G的数量(前提:系统的物理cpu线程数>=P的数量);P的最大作用还是其拥有的各种G对象队列、链表、一些cache和状态。
GPM之间的关系如下:
- G实际运行在M上;
- M需要绑定P才能运行;
- 程序中的多个M并不会同时都处于执行状态,最多只有cpu线程数个M在执行。
- 每个P维护一个G队列;
- 当一个G被创建出来,或者变为可执行状态时,就把他放到P的 可执行队列中;
- 当一个G执行结束时,P会从队列中把该G取出;如果此时P的队列为空,即没有其他G可以执行,M也会尝试从全局队列拿一批G放到P的本地队列,全局没有再从其他P的本地队列偷一半放到自己P的本地队列(work-stealing调度算法)。
总结
为了运行G0, M0需要持有上下文P。当你创建一个新的G的时候(go func()方法),它会被放入P的queue(满了放入全局队列)。
当你的G0执行阻塞的系统调用的时候(syscall),阻塞的系统调用会触发中断(intercepted),运行时会把M0从P中摘除(detach),然后再创建一个新的操作系统的线程M1(如果没有空闲的线程可用的话)来服务于这个P。
当系统调用继续的时候,线程M0被唤醒。尝试为G0捕获一个P上下文,可能从调度器的空闲P列表中获取。如果获取不成功,M会将G0放入调度器的可执行G队列中,等待其他P的查找。为了保证G的均衡性,非空闲的P会运行完自身的可执行G队列后,会周期性从调度器的可执行G队列中获取执行的G,甚至从其他的P的可执行G队列中掠夺G。M执行完P中的G不会空闲等待,而是会尝试去steal其他的G。现尝试全局,没有在随机一个P拿走部分G。work-steal(任务窃取算法)
Go运行时会在下面的goroutine被阻塞的情况下运行另外一个goroutine:
- blocking syscall (for example opening a file),
- network input,
- channel operations,
- primitives in the sync package.
sync包
waitgroup
信号量,Wait会卡住。等待所有goroutine执行完成再执行Wait之后的内容
/**
* Createby GoLand
* User xzw jsjxzw@163.com
* Date 2021/6/25
* Time 下午3:37
*/
package main
import (
"sync"
"sync/atomic"
)
var counter int64
func incr() {
for i := 0; i < 10; i++ {
// 此处使用 atomic 原语 cas (compare and swap)
atomic.AddInt64(&counter, 1)
}
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
incr()
}()
}
wg.Wait()
println(counter)
}
atomic 原语
chan
channel size 为多少并发量就是多少,为1串行
/**
* Createby GoLand
* User xzw jsjxzw@163.com
* Date 2021/6/25
* Time 下午3:48
*/
package main
import "sync"
type Lock struct {
c chan struct{}
}
func NewLock() Lock {
var l Lock
l.c = make(chan struct{}, 1)
l.c <- struct{}{}
return l
}
func (l Lock) Lock() bool {
lockResult := false
select {
case <-l.c:
lockResult = true
default:
}
return lockResult
}
func (l Lock) UnLock() {
l.c <- struct{}{}
}
var counter int
var l = NewLock()
func incr() {
if !l.Lock() {
incr()
return
}
for i := 0; i < 10; i++ {
counter++
}
l.UnLock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
incr()
}()
}
wg.Wait()
println(counter)
}
mutex
https://juejin.cn/post/6986473832202633229#heading-7
sync.Mutex 互斥锁
type Mutex struct {
state int32
sema uint32 // 信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒
}
// state 最低三位
// 001 mutexLocked
// 010 mutexWoken
// 100 mutexStarving
// state总共是32位长度,所以剩下的位置,用来表示可以有1<<(32-3)个Goroutine 等待互斥锁的释放:
const (
mutexLocked = 1 << iota // mutex is locked 次数iota为0
mutexWoken // 010 2 是否唤醒 1 << iota
mutexStarving // 100 4 是否饥饿 1 << iota
mutexWaiterShift = iota // 此处iota为3,iota在一个const内累计
starvationThresholdNs = 1e6
)
Lock
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
lockSlow
func (m *Mutex) lockSlow() {
// 等待时间
var waitStartTime int64
// 饥饿标记
starving := false
// 唤醒标记
awoke := false
// 自旋次数
iter := 0
// 当前锁状态
old := m.state
for {
// 锁是非饥饿状态,锁还没有释放,尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// awoke未唤醒 没有其他正在唤醒的节点 当前有正在等待的goroutine
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
// CAS 设置 state 的 mutexWoken状态位为1
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// CAS成功,唤醒awoke为true
awoke = true
}
// 自旋
runtime_doSpin()
// 自旋次数加1
iter++
// 设置当前锁的状态
old = m.state
continue
}
new := old
// 非饥饿状态
if old&mutexStarving == 0 {
// 锁状态位设为1
new |= mutexLocked
}
// 如果锁旧状态是被锁定或者处于饥饿状态,则waiter加一,表示一个等待计数
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前goroutine是饥饿标记并且已经上锁,那么锁 mutexStarving 状态为设置为1
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果当前goroutine是唤醒标记
if awoke {
// 锁状态为未唤醒,异常情况
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除唤醒标记位
new &^= mutexWoken
}
// CAS 设置新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果原来状态没有上锁也没有饥饿
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 到这里时,代表没有获得锁
queueLifo := waitStartTime != 0
// 如果等待时间为0,初始化等待时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待,当前 goroutine 进行休眠
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 唤醒之后检查锁是否应该处于饥饿状态(等待时间超过1ms),并设置 starving 变量
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果已经处于饥饿状态
if old&mutexStarving != 0 {
// 已锁或已唤醒 或者 0 个 goroutine 等待互斥锁的释放,异常
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 与下面的atomic.AddInt32($m.state, delta)结合理解
// 等待互斥锁释放的 goroutine - 1
// 上锁
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果当前 goroutine 不是饥饿状态,或者有且只有 1 个 goroutine 等待互斥锁的释放
if !starving || old>>mutexWaiterShift == 1 {
// 饥饿模式位-1(移除饥饿模式)
delta -= mutexStarving
}
// atomic add 操作,设置状态,抢锁
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
Unlock
/**
* Createby GoLand
* User xzw jsjxzw@163.com
* Date 2021/6/25
* Time 下午3:42
*/
package main
import "sync"
var counter int
var l sync.Mutex
func incr() {
l.Lock()
for i := 0; i < 10; i++ {
counter++
}
l.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
incr()
}()
}
wg.Wait()
println(counter)
}
RWMutex
https://segmentfault.com/a/1190000039712353
once
/**
* Createby GoLand
* User xzw jsjxzw@163.com
* Date 2021/6/25
* Time 下午3:59
*/
package main
import (
"fmt"
"sync"
)
var counter int
func incr() {
for i := 0; i < 10; i++ {
counter++
}
}
func main() {
var wg sync.WaitGroup
var once sync.Once
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
once.Do(incr)
}()
}
wg.Wait()
fmt.Println(counter)
}
sync.map
线程安全的map,性能较低
/**
* Createby GoLand
* User xzw jsjxzw@163.com
* Date 2021/6/25
* Time 下午4:03
*/
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var counter int64
func main() {
var sm sync.Map
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
n := atomic.AddInt64(&counter, 1)
sm.Store(n, n*n)
}()
}
wg.Wait()
if v, ok := sm.Load(int64(1)); ok {
fmt.Println(v)
}
sm.Range(func(k, v interface{}) bool {
fmt.Print(k)
fmt.Print(":")
fmt.Print(v)
fmt.Println()
return true
})
}