为什么加锁

安全问题
程序并发或并行时,对全局变量的修改需要加锁,保证串行化。

锁的缺点

未抢到锁的程序(进程,线程,协程)不停的尝试抢锁,cpu占用很高
这种情况又被称为自旋锁,CAS,轻量级锁
自旋锁适用于并发量低且程序执行时间短的情况下

未抢到锁的程序进入等待队列,效率较CAS低
这种情况又被称为重量级锁
适用于高并发场景

在JAVA中synchronized有锁升级的过程,偏向锁->自旋锁->重量级锁
go语言Mutex也有此类似过程

锁的两种情况

进程内锁和集群内锁

进程内加锁

Mutex实现

package main

import "sync"

var counter int
var l sync.Mutex

func incr() {
    l.Lock()
    counter++
    l.Unlock()
}
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
    println(counter)
}

chan实现

package main

import "sync"

type Lock struct {
    c chan struct{}
}

func NewLock() Lock {
    var l Lock
    l.c = make(chan struct{}, 1)
    l.c <- struct{}{}
    return l
}
func (l Lock) Lock() bool {
    lockResult := false
    select {
    case <-l.c:
        lockResult = true
    default:
    }
    return lockResult
}
func (l Lock) UnLock() {
    l.c <- struct{}{}
}

var counter int

var l = NewLock()

func incr() {
    if !l.Lock() {
        incr()
        return
    }
    counter++
    l.UnLock()
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
    println(counter)
}

集群内加锁

分布式系统

Redis的setnx

package main

import (
    "fmt"
    "sync"
    "time"

    "github.com/go-redis/redis"
)

var client *redis.Client

func init() {
    c := redis.NewClient(&redis.Options{
        Addr:     "node01:6379",
        Password: "",
        DB:       0,
        PoolSize: 100,
    })
    client = c
}

func incr() {

    var lockKey = "counter_lock"
    var counterKey = "counter"

    resp := client.SetNX(lockKey, 1, time.Second*5)
    lockSuccess, err := resp.Result()
    if err != nil || !lockSuccess {
        fmt.Println(err, "lock result: ", lockSuccess)
        //incr()
        return
    }
    getResp := client.Get(counterKey)
    cntValue, err := getResp.Int64()
    if err == nil || err == redis.Nil {
        cntValue++
        resp := client.Set(counterKey, cntValue, 0)
        _, err := resp.Result()
        if err != nil {
            panic("set value error!")
        }
    }
    delResp := client.Del(lockKey)
    unlockSuccess, err := delResp.Result()
    if err == nil && unlockSuccess > 0 {
    } else {
        panic("unlock failed")

    }
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
    getResp := client.Get("counter")
    cntValue, err := getResp.Int64()
    if err == nil || err == redis.Nil {
        println(cntValue)
    }
}

基于ZooKeeper

package main

import (
    "sync"
    "time"

    "github.com/go-zookeeper/zk"
)

var counter int
var c *zk.Conn

func init() {
    conn, _, err := zk.Connect([]string{"node01", "node02", "node03", "node04"}, time.Second*10)
    if err != nil {
        panic(err)
    }
    c = conn
}

func incr() {

    l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll))
    err := l.Lock()
    if err != nil {
        panic(err)
    }
    counter++
    println("lock succ,do your business logic")
    l.Unlock()
    println("unlock succ,finish business logic")
}

func main() {

    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
    println(counter)
}
基于ZooKeeper的锁与基于Redis的锁的不同之处在于Lock成功之前会一直阻塞,这与我们单机场景中的mutex.Lock很相似。
其原理也是基于临时Sequence节点和watch API,例如我们这里使用的是/lock节点。Lock会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有watch该节点的程序。这时候程序会检查当前节点下最小的子节点的id是否与自己的一致。如果一致,说明加锁成功了。
这种分布式的阻塞锁比较适合分布式任务调度场景,但不适合高频次持锁时间短的抢锁场景。按照Google的Chubby论文里的阐述,基于强一致协议的锁适用于粗粒度的加锁操作。这里的粗粒度指锁占用时间较长。我们在使用时也应思考在自己的业务场景中使用是否合适。

基于etcd

package main

import (
    "log"
    "sync"

    "github.com/zieckey/etcdsync"
)

var counter int
var m *etcdsync.Mutex

func init() {
    mutex, err := etcdsync.New("/lock", 10, []string{"http://node01:2379"})
    if m == nil || err != nil {
        log.Printf("etcdsync.New failed")
        return
    }
    m = mutex
}

func incr() {
    err := m.Lock()
    if err != nil {
        log.Printf("etcdsync.Lock failed")
        return
    }

    counter++

    err = m.Unlock()
    if err != nil {
        log.Println("etcdsync.Unlock failed")
    } else {
        log.Printf("etcdsync.Unlock OK")
    }
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
    println(counter)
}
etcd中没有像ZooKeeper那样的Sequence节点。所以其锁实现和基于ZooKeeper实现的有所不同。在上述示例代码中使用的etcdsync的Lock流程是:
先检查/lock路径下是否有值,如果有值,说明锁已经被别人抢了
如果没有值,那么写入自己的值。写入成功返回,说明加锁成功。写入时如果节点被其它节点写入过了,那么会导致加锁失败,这时候到 3
watch /lock下的事件,此时陷入阻塞
当/lock路径下发生事件时,当前进程被唤醒。检查发生的事件是否是删除事件(说明锁被持有者主动unlock),或者过期事件(说明锁过期失效)。如果是的话,那么回到 1,走抢锁流程。
值得一提的是,在etcdv3的API中官方已经提供了可以直接使用的锁API,读者可以查阅etcd的文档做进一步的学习。
Scroll to Top