介绍

Go Concurrency Patterns: Context

在 Go 服务器中,每个传入请求都在其自己的 goroutine 中处理。请求处理程序通常会启动额外的 goroutine 来访问后端,例如数据库和 RPC 服务。处理请求的一组 goroutine 通常需要访问特定于请求的值,例如最终用户的身份、授权令牌和请求的截止日期。当请求被取消或超时时,处理该请求的所有 goroutine 都应该快速退出,以便系统可以回收它们正在使用的任何资源。

Google开发了一个context包,可以轻松地将请求范围的值、取消信号和截止日期跨 API 边界传递给处理请求所涉及的所有 goroutine。该包作为context公开可用 。

前置知识

Done() 方法返回一个channel

cancel、timeout、deadline 这三种情况,会close channel

由于close channel 所有协程都能监听到,所以所有goroutine都可以关闭

特别注意,除了close操作,往channel中写入一条数据时,只能有一个goroutine能冲channel中获取到数据

语境

context包的核心是Context类型

// 上下文携带截止日期、取消信号和请求范围的值
// 跨越API边界。他的方法对于多goroutine同时使用是安全的
type Context interface{
    // Done returns a channel that is closed when this Context is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}

派生上下文

context包提供了从现有值中派生出Context值的函数。这些值形成一棵树:当 aContext被取消时,所有Contexts从它派生的也被取消。

Background是任何Context树的根;它永远不会被取消:

// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level Context for incoming requests.
func Background() Context

WithCancelWithTimeout返回衍生Context值,可以比父Context更快地取消。当请求处理程序返回时,通常会取消与传入请求相关的ContextWithCancel还可用于在使用多个副本时取消冗余请求。WithTimeout可用于为后端服务器请求设置截止日期

// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// A CancelFunc cancels a Context.
type CancelFunc func()

// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

WithValue提供了一种方法,将请求范围的值与Context关联

// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context

示例

/**
 * Createby GoLand
 * User xzw jsjxzw@163.com
 * Date 2021/7/3
 * Time 下午1:46
 */

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

func main() {
    http.HandleFunc("/", SayHello)

    log.Fatalln(http.ListenAndServe(":8080", nil))
}

func SayHello(writer http.ResponseWriter, request *http.Request) {
    fmt.Println(&request)

    go func() {
        for range time.Tick(time.Second) {
            select {
            case <-request.Context().Done():
                fmt.Println("request is outgoing")
                return
            default:
                fmt.Println("Current request is in progress")
            }
        }
    }()

    time.Sleep(2 * time.Second)

    writer.Write([]byte("Hi"))
}

作用

在 Go 语言中 context 包允许您传递一个 “context” 到您的程序。 Context 如超时或截止日期(deadline)或通道,来指示停止运行和返回。例如,如果您正在执行一个 web 请求或运行一个系统命令,定义一个超时对生产级系统通常是个好主意。因为,如果您依赖的API运行缓慢,你不希望在系统上备份(back up)请求,因为它可能最终会增加负载并降低所有请求的执行效率。导致级联效应。这是超时或截止日期 context 派上用场的地方。

设计原理

Go 语言中的每一个请求的都是通过一个单独的 Goroutine 进行处理的,HTTP/RPC 请求的处理器往往都会启动新的 Goroutine 访问数据库和 RPC 服务,我们可能会创建多个 Goroutine 来处理一次请求,而 Context 的主要作用就是在不同的 Goroutine 之间同步请求特定的数据、取消信号以及处理请求的截止日期。

context包设计原理

/**
 * Createby GoLand
 * User xzw jsjxzw@163.com
 * Date 2021/7/3
 * Time 下午2:14
 */

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    go helloHandle(ctx, 2*time.Second)
    select {
    case <-ctx.Done(): //此处阻塞2s
        fmt.Println("Hello Handle", ctx.Err())
    }
    time.Sleep(3 * time.Second)
}

func helloHandle(ctx context.Context, duration time.Duration) {
    select {
    case <-ctx.Done():
        fmt.Println(ctx.Err())
    case <-time.Tick(duration):
        fmt.Println("process request with", duration)
    }
}

使用场景

RPC调用

在主goroutine上有4个RPC,RPC2/3/4是并行请求的,我们这里希望在RPC2请求失败之后,直接返回错误,并且让RPC3/4停止继续计算。这个时候,就使用的到Context。

package main

import (
    "context"
    "sync"
    "github.com/pkg/errors"
)

func Rpc(ctx context.Context, url string) error {
    result := make(chan int)
    err := make(chan error)

    go func() {
        // 进行RPC调用,并且返回是否成功,成功通过result传递成功信息,错误通过error传递错误信息
        isSuccess := true
        if isSuccess {
            result <- 1
        } else {
            err <- errors.New("some error happen")
        }
    }()

    select {
        case <- ctx.Done():
            // 其他RPC调用调用失败
            return ctx.Err()
        case e := <- err:
            // 本RPC调用失败,返回错误信息
            return e
        case <- result:
            // 本RPC调用成功,不返回错误信息
            return nil
    }
}


func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // RPC1调用
    err := Rpc(ctx, "http://rpc_1_url")
    if err != nil {
        return
    }

    wg := sync.WaitGroup{}

    // RPC2调用
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := Rpc(ctx, "http://rpc_2_url")
        if err != nil {
            cancel()
        }
    }()

    // RPC3调用
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := Rpc(ctx, "http://rpc_3_url")
        if err != nil {
            cancel()
        }
    }()

    // RPC4调用
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := Rpc(ctx, "http://rpc_4_url")
        if err != nil {
            cancel()
        }
    }()

    wg.Wait()
}

PipeLine

pipeline模式就是流水线模型,流水线上的几个工人,有n个产品,一个一个产品进行组装。其实pipeline模型的实现和Context并无关系,没有context我们也能用chan实现pipeline模型。但是对于整条流水线的控制,则是需要使用上Context的。

超时请求

我们发送RPC请求的时候,往往希望对这个请求进行一个超时的限制。当一个RPC请求超过10s的请求,自动断开。当然我们使用CancelContext,也能实现这个功能(开启一个新的goroutine,这个goroutine拿着cancel函数,当时间到了,就调用cancel函数)。

鉴于这个需求是非常常见的,context包也实现了这个需求:timerCtx。具体实例化的方法是 WithDeadline 和 WithTimeout。

具体的timerCtx里面的逻辑也就是通过time.AfterFunc来调用ctx.cancel的。

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err()) // prints "context deadline exceeded"
    }
}

http的客户端里面加上timeout也是一个常见的办法

uri := "https://httpbin.org/delay/3"
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
    log.Fatalf("http.NewRequest() failed with '%s'\n", err)
}

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req)
if err != nil {
    log.Fatalf("http.DefaultClient.Do() failed with:\n'%s'\n", err)
}
defer resp.Body.Close()

HTTP服务器的request互相传递数据

context还提供了valueCtx的数据结构。

这个valueCtx最经常使用的场景就是在一个http服务器中,在request中传递一个特定值,比如有一个中间件,做cookie验证,然后把验证后的用户名存放在request中。

我们可以看到,官方的request里面是包含了Context的,并且提供了WithContext的方法进行context的替换。

package main

import (
    "net/http"
    "context"
)

type FooKey string

var UserName = FooKey("user-name")
var UserId = FooKey("user-id")

func foo(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        ctx := context.WithValue(r.Context(), UserId, "1")
        ctx2 := context.WithValue(ctx, UserName, "yejianfeng")
        next(w, r.WithContext(ctx2))
    }
}

func GetUserName(context context.Context) string {
    if ret, ok := context.Value(UserName).(string); ok {
        return ret
    }
    return ""
}

func GetUserId(context context.Context) string {
    if ret, ok := context.Value(UserId).(string); ok {
        return ret
    }
    return ""
}

func test(w http.ResponseWriter, r *http.Request) {
    w.Write([]byte("welcome: "))
    w.Write([]byte(GetUserId(r.Context())))
    w.Write([]byte(" "))
    w.Write([]byte(GetUserName(r.Context())))
}

func main() {
    http.Handle("/", foo(test))
    http.ListenAndServe(":8080", nil)
}
Scroll to Top