goroutine调度

gorountine是一个用户态级别的类线程,所以它的堆栈也必然是在用户态的调度器中进行管理的

GMP

每个P中有一个G的队列(多个G)

M是一个实际线程,需要与P绑定才能运行,运行态M数量等于CPU线程数量

P是逻辑处理器,默认等于CPU线程数

G有阻塞代码,触发系统中断操作,而导致M被踢出CPU,

G 代表一个Goroutine;存储了goroutine的执行stack信息、goroutine状态以及goroutine的任务函数等;另外G对象是可以重用的。

M 代表一个操作系统的线程;M代表着真正的执行计算资源。在绑定有效的P后,进入schedule循环

P 代表一个逻辑CPU处理器(在golang中这个是一个对cpu的抽象),通过runtime.GOMAXPROCS (numLogicalProcessors)可以控制多少P,但是通常P的数量设置是等于CPU线程数(runtime.GOMAXPROCS()),不设置默认CPU线程数

P的数量决定了系统内最大可并行的G的数量(前提:系统的物理cpu线程数>=P的数量);P的最大作用还是其拥有的各种G对象队列、链表、一些cache和状态。

GPM之间的关系如下:

  1. G实际运行在M上;
  2. M需要绑定P才能运行;
  3. 程序中的多个M并不会同时都处于执行状态,最多只有cpu线程数个M在执行。
  4. 每个P维护一个G队列;
  5. 当一个G被创建出来,或者变为可执行状态时,就把他放到P的 可执行队列中;
  6. 当一个G执行结束时,P会从队列中把该G取出;如果此时P的队列为空,即没有其他G可以执行,M也会尝试从全局队列拿一批G放到P的本地队列,全局没有再从其他P的本地队列偷一半放到自己P的本地队列(work-stealing调度算法)。

总结

为了运行G0, M0需要持有上下文P。当你创建一个新的G的时候(go func()方法),它会被放入P的queue(满了放入全局队列)。

当你的G0执行阻塞的系统调用的时候(syscall),阻塞的系统调用会触发中断(intercepted),运行时会把M0从P中摘除(detach),然后再创建一个新的操作系统的线程M1(如果没有空闲的线程可用的话)来服务于这个P。

当系统调用继续的时候,线程M0被唤醒。尝试为G0捕获一个P上下文,可能从调度器的空闲P列表中获取。如果获取不成功,M会将G0放入调度器的可执行G队列中,等待其他P的查找。为了保证G的均衡性,非空闲的P会运行完自身的可执行G队列后,会周期性从调度器的可执行G队列中获取执行的G,甚至从其他的P的可执行G队列中掠夺G。M执行完P中的G不会空闲等待,而是会尝试去steal其他的G。现尝试全局,没有在随机一个P拿走部分G。work-steal(任务窃取算法)

Go运行时会在下面的goroutine被阻塞的情况下运行另外一个goroutine:

  • blocking syscall (for example opening a file),
  • network input,
  • channel operations,
  • primitives in the sync package.

sync包

go语言实现锁

waitgroup

信号量,Wait会卡住。等待所有goroutine执行完成再执行Wait之后的内容

/**
 * Createby GoLand
 * User xzw jsjxzw@163.com
 * Date 2021/6/25
 * Time 下午3:37
 */

package main

import (
    "sync"
    "sync/atomic"
)

var counter int64

func incr() {
    for i := 0; i < 10; i++ {
    // 此处使用 atomic 原语  cas (compare and swap)
        atomic.AddInt64(&counter, 1)
    }
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
    println(counter)
}

atomic 原语

image-20210708162500635

chan

channel size 为多少并发量就是多少,为1串行

/**
 * Createby GoLand
 * User xzw jsjxzw@163.com
 * Date 2021/6/25
 * Time 下午3:48
 */

package main

import "sync"

type Lock struct {
    c chan struct{}
}

func NewLock() Lock {
    var l Lock
    l.c = make(chan struct{}, 1)
    l.c <- struct{}{}
    return l
}
func (l Lock) Lock() bool {
    lockResult := false
    select {
    case <-l.c:
        lockResult = true
    default:
    }
    return lockResult
}
func (l Lock) UnLock() {
    l.c <- struct{}{}
}

var counter int

var l = NewLock()

func incr() {
    if !l.Lock() {
        incr()
        return
    }
    for i := 0; i < 10; i++ {
        counter++
    }
    l.UnLock()
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
    println(counter)
}

mutex

https://juejin.cn/post/6986473832202633229#heading-7

sync.Mutex 互斥锁

type Mutex struct {
    state int32
    sema  uint32 // 信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒
}
// state 最低三位 
// 001 mutexLocked
// 010 mutexWoken
// 100 mutexStarving
// state总共是32位长度,所以剩下的位置,用来表示可以有1<<(32-3)个Goroutine 等待互斥锁的释放:

const (
    mutexLocked = 1 << iota // mutex is locked 次数iota为0
    mutexWoken // 010 2 是否唤醒 1 << iota
    mutexStarving // 100 4 是否饥饿 1 << iota
    mutexWaiterShift = iota // 此处iota为3,iota在一个const内累计
    starvationThresholdNs = 1e6
)

Lock

func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

lockSlow

func (m *Mutex) lockSlow() {
  // 等待时间
    var waitStartTime int64
  // 饥饿标记
    starving := false
  // 唤醒标记
    awoke := false
  // 自旋次数
    iter := 0
  // 当前锁状态
    old := m.state
    for {
    // 锁是非饥饿状态,锁还没有释放,尝试自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      // awoke未唤醒 没有其他正在唤醒的节点 当前有正在等待的goroutine 
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
      // CAS 设置 state 的 mutexWoken状态位为1
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
          // CAS成功,唤醒awoke为true
                awoke = true
            }
      // 自旋
            runtime_doSpin()
      // 自旋次数加1
            iter++
      // 设置当前锁的状态
            old = m.state
            continue
        }
        new := old
    // 非饥饿状态
        if old&mutexStarving == 0 {
      // 锁状态位设为1
            new |= mutexLocked
        }
    // 如果锁旧状态是被锁定或者处于饥饿状态,则waiter加一,表示一个等待计数
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
    // 如果当前goroutine是饥饿标记并且已经上锁,那么锁 mutexStarving 状态为设置为1
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
    // 如果当前goroutine是唤醒标记
        if awoke {
      // 锁状态为未唤醒,异常情况
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
      // 清除唤醒标记位
            new &^= mutexWoken
        }
    // CAS 设置新状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
      // 如果原来状态没有上锁也没有饥饿
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 到这里时,代表没有获得锁
            queueLifo := waitStartTime != 0
      // 如果等待时间为0,初始化等待时间
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
      // 阻塞等待,当前 goroutine 进行休眠
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      // 唤醒之后检查锁是否应该处于饥饿状态(等待时间超过1ms),并设置 starving 变量
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
      // 如果已经处于饥饿状态
            if old&mutexStarving != 0 {
        // 已锁或已唤醒 或者 0 个 goroutine 等待互斥锁的释放,异常
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
        // 与下面的atomic.AddInt32($m.state, delta)结合理解
        // 等待互斥锁释放的 goroutine - 1
        // 上锁
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
        // 如果当前 goroutine 不是饥饿状态,或者有且只有 1 个 goroutine 等待互斥锁的释放
                if !starving || old>>mutexWaiterShift == 1 {
          // 饥饿模式位-1(移除饥饿模式)
                    delta -= mutexStarving
                }
        // atomic add 操作,设置状态,抢锁
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

img

Unlock

20201218225221

/**
 * Createby GoLand
 * User xzw jsjxzw@163.com
 * Date 2021/6/25
 * Time 下午3:42
 */

package main

import "sync"

var counter int
var l sync.Mutex

func incr() {
    l.Lock()
    for i := 0; i < 10; i++ {
        counter++
    }
    l.Unlock()
}
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
    println(counter)
}

RWMutex

https://segmentfault.com/a/1190000039712353

once

/**
 * Createby GoLand
 * User xzw jsjxzw@163.com
 * Date 2021/6/25
 * Time 下午3:59
 */

package main

import (
    "fmt"
    "sync"
)

var counter int

func incr() {
    for i := 0; i < 10; i++ {
        counter++
    }
}

func main() {
    var wg sync.WaitGroup
    var once sync.Once
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            once.Do(incr)
        }()
    }
    wg.Wait()
    fmt.Println(counter)
}

sync.map

线程安全的map,性能较低

image-20210709102704809

/**
 * Createby GoLand
 * User xzw jsjxzw@163.com
 * Date 2021/6/25
 * Time 下午4:03
 */

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

var counter int64

func main() {
    var sm sync.Map
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            n := atomic.AddInt64(&counter, 1)
            sm.Store(n, n*n)
        }()
    }
    wg.Wait()
    if v, ok := sm.Load(int64(1)); ok {
        fmt.Println(v)
    }

    sm.Range(func(k, v interface{}) bool {
        fmt.Print(k)
        fmt.Print(":")
        fmt.Print(v)
        fmt.Println()
        return true
    })

}

image-20210701111029768

Scroll to Top