协程 Coroutine

  • 轻量级“线程”
  • 非抢占式多任务处理,由协程主动交出控制权
  • 编译器/解释器/虚拟机层面的多任务
  • 多个协程可能在一个或多个线程上运行

Subroutines are special cases of more general program components,called coroutines.In contrast to the unsymmetric
– 子程序是协程的一个特例

其它语言中的协程

  • C++: Boost.Coroutine
  • Java: 不支持
  • python
    使用yield关键字实现协程
    Python3.5加入了async def对协程原生支持

goroutine

  • 任何函数只需加上go就能送给调度器运行
  • 不需要在定义时区分是否是一步函数
  • 调度器在合适的点进行切换
  • 使用-race来检测数据访问冲突

goroutine可能的切换点

  • I/O, select
  • channel
  • 等待锁
  • 函数调用(有时)
  • runtime.Gosched()
    只是参考,不能保证在其它地方不切换

channel

package main

import (
    "fmt"
    "time"
)

func worker(id int, c chan int) {
    for n := range c {
        //n, ok := <-c
        //if !ok {
        //  break
        //}
        fmt.Printf("Worker %d received %d\n", id, n)
    }
}

func createWorker(id int) chan<- int {
    c := make(chan int)
    go worker(id, c)
    return c
}

func chanDemo() {
    var channels [10]chan<- int
    for i := 0; i < 10; i++ {
        channels[i] = createWorker(i)
    }
    for i := 0; i < 10; i++ {
        channels[i] <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        channels[i] <- 'A' + i
    }
    time.Sleep(time.Millisecond)
}

func bufferedChannel() {
    c := make(chan int, 3) //提升性能
    go worker(0, c)
    c <- 'a'
    c <- 'b'
    c <- 'c'
    c <- 'd'
    time.Sleep(time.Millisecond)
}

func channelClose() {
    c := make(chan int, 3) //提升性能
    go worker(0, c)
    c <- 'a'
    c <- 'b'
    c <- 'c'
    c <- 'd'
    close(c)
    time.Sleep(time.Millisecond)
}

func main() {
    //Channel as first-class citizen
    chanDemo()
    //Buffered channel
    bufferedChannel()
    //Channel close and rage
    channelClose()
}

理论基础:Communication Sequential Process(CSP)

Don’t communicate by sharing memory;share memory by communicating.
– 不要通过共享内存来通信;通过通信来共享内存

WaitGroup 使用Channel等待任务结束

package main

import (
    "fmt"
    "sync"
)

func doWorker(id int, w worker) {
    for n := range w.in {
        fmt.Printf("Worker %d received %c\n", id, n)
        //go func() { done <- true }()
        w.done()
    }

}

type worker struct {
    in   chan int
    done func()
}

func createWorker(id int, wg *sync.WaitGroup) worker {
    w := worker{
        in: make(chan int),
        done: func() {
            wg.Done()
        },
    }
    go doWorker(id, w)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    wg.Add(20)

    var workers [10]worker
    for i := 0; i < 10; i++ {
        workers[i] = createWorker(i, &wg)
    }

    for i, worker := range workers {
        worker.in <- 'a' + i
    }
    for i, worker := range workers {
        worker.in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    //Channel as first-class citizen
    chanDemo()

}

使用Channel进行树的遍历

type Node struct {
    Value       int
    Left, Right *Node
}

func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }

    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}

func (node *Node) TraverseWithChannel() chan *Node {
    out := make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out)
    }()
    return out
}

func main() {
    c := TraverseWithChannel()
    maxNode := 0
    for node := range c {
        if node.Value > maxNode {
            maxNode = node.Value
        }
    }
    fmt.Println("Max node value:", maxNode)
}

使用select来进行调度

  • select 的使用
  • 定时器的使用
  • 在select只能够使用nil channel
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(time.Second)
        fmt.Printf("Worker %d received %d\n", id, n)
    }
}

func createWorker(id int) chan<- int {
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var worker = createWorker(0)

    var values []int
    tm := time.After(10 * time.Second)
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = worker
            activeValue = values[0]
        }
        select {
        case n := <-c1:
            values = append(values, n)
        case n := <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        case <-time.After(800 * time.Millisecond):
            fmt.Println("timeout")
        case <-tick:
            fmt.Println("queue len =", len(values))
        case <-tm:
            fmt.Println("bye")
            return
        }
    }

}

传统同步机制(尽量少用)

  • WaitGroup
  • Mutex(互斥量)
  • Cond
package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock  sync.Mutex
}

func (a *atomicInt) increment() {
    fmt.Println("safe increment")
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()

        a.value++
    }()

}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get())
}

Scroll to Top