Contents

Mutex解析

mutex架构演进的四个阶段:

https://note.youdao.com/yws/api/personal/file/WEB96b1758fed38c543ffda0ee3feefabb8?method=download&shareKey=0dbc3b4104ff2d8cf2a821243261fc5e

初版

“初版”的Mutex使用一个flag来表示锁是否被持有,实现比较简单;后来照顾到新来的goroutine,所以会让新的 goroutine 也尽可能地先获取到锁;那么,接下来就是第三阶段“多给些机会”,照顾新来的和被唤醒的 goroutine;但是这样会带来饥饿问题,所以目前又加入了饥饿的解决方案,也就是第四阶段“解决饥饿”。

初版的互斥锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
   // CAS操作,当时还没有抽象出atomic包
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)
// 互斥锁的结构,包含两个字段
type Mutex struct {
    key  int32 // 锁是否被持有的标识
    sema int32 // 信号量专用,用以阻塞/唤醒goroutine
}

// 保证成功在val上增加delta的值
func xadd(val *int32, delta int32) (new int32) {
    for {
        v := *val
        if cas(val, v, v+delta) {
            return v + delta
        }
    }
    panic("unreached")
}

// 请求锁
func (m *Mutex) Lock() {
    if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
        return
    }
    semacquire(&m.sema) // 否则阻塞等待
}

func (m *Mutex) Unlock() {
    if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
        return
    }
    semrelease(&m.sema) // 唤醒其它阻塞的goroutine
}    

CAS 指令将给定的值和一个内存地址中的值进行比较,如果它们是同一个值,就使用新值替换内存地址中的值,这个操作是原子性的。原子性保证这个指令总是基于最新的值进行计算,如果同时有其它线程已经修改了这个值,那么,CAS 会返回失败

mutex结构体包含两个字段:

  • key:是一个 flag,用来标识这个排外锁是否被某个goroutine 所持有,如果key大于等于1,说明这个排外锁已经被持有;
  • sama:是个信号量变量,用来控制等待goroutine的阻塞休眠和唤醒。 https://note.youdao.com/yws/api/personal/file/WEBb21b2a0294b7db6078612944963e771a?method=download&shareKey=b03cb5a7dc5a11c4e73a35d1e4facd68

调用Lock请求锁时,通过xadd方法进行CAS操作,xadd方法循环执行CAS操作直到成功,保证对key加1的操作成功完成。如果比较幸运,锁没有被别的 goroutine 持有,那么,Lock方法成功地将key设置为 1,这个goroutine 就持有了这个锁;如果锁已经被别的goroutine持有了,那么,当前的 goroutine 会把 key 加 1,而且还会调用semacquire 方法,使用信号量将自己休眠,等锁释放的时候,信号量会将它唤醒。

CAS:compare-and-swap,atomic包里的CompareAndSwapInt32()方法,比较并交换。

持有锁的goroutine调用Unlock释放锁时,它会将key减 1。如果当前没有其它等待这个锁的 goroutine,这个方法就返回了。但是,如果还有等待此锁的其它 goroutine,那么,它会调用semrelease方法,利用信号量唤醒等待锁的其它 goroutine 中的一个。

初版的MUtex利用CAS原子操作,对key标志量进行设置。key不仅仅标识了锁是否被goroutine所持有,还记录了当前持有和等待获取锁的goroutine的数量。

Unlock方法可以被任意的goroutine调用释放锁,即使是没持有这个互斥锁的goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的 goroutine 的信息,所以,Unlock 也不会对此进行检查。

所以我们在使用mutext的时候,必须保证goroutine尽可能不去释放自己未持有的锁,遵循谁申请,谁释放的原则。

第二版

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Mutex struct{
    state int32
    sema uint32
}

const (
    mutextLocker = 1 << iota    //持有锁的标记
    mutextWoken         //唤醒标记
    mutexWaiterShift = iota //阻塞等待的waiter数量
)

Mutex还是两个字段,但是第一个字段已经改成state,是一个复合字段,表示多个意义。这个字段的第一位标识这个锁是否被持有,第二位标识是否又唤醒的goroutine,剩余的位数代表等待此锁的goroutine数。 请求锁的goroutine有两种:

  • 新goroutine
  • 被唤醒的goroutine

请求锁的goroutine有两类:

请求锁的goroutine 当前锁被持有 当前锁未被持有
新来的goroutine waiter++;休眠 获取到锁
被唤醒的goroutine 清除mutexWoken标志;重新休眠,加入等待队列 清除mutexWoken标志,获取到锁

请求锁的方法 Lock实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (m *Mutex) Lock() { 
    // Fast path: 能够直接获取到锁 
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return 
    } 
    awoke := false 
    for { 
        old := m.state 
        new := old | mutexLocked // 新状态加锁 
        if old&mutexLocked != 0 {
            new = old + 1<<mutexWaiterShift
        }
        if awoke { 
            // goroutine是被唤醒的, 
            // 新状态清除唤醒标志 
            new &^= mutexWoken 
        } 
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
        //设置新状态 
            if old&mutexLocked == 0 { 
                // 锁原状态未加锁 
                break 
                    
            } 
            runtime.Semacquire(&m.sema) // 请求信号量 
            awoke = true
        } 
        
    }
}

先是通过CAS检测state字段中的标志,如果没有goroutine持有锁,也没有等待持有锁的goroutine,那么当前的goroutine可以直接获得锁。

如果state不是零值,那么通过一个循环检查。for循环是不断尝试获取锁,如果获取不到,就通过runtime.Semacquire(&m.sema)休眠,休眠醒来之后awoke设置为true,尝试争抢锁。

第九行new := old | mutexLocked将当前的flag设置为加锁状态,如果能成功通过CAS把这个新值赋到state,就代表抢占锁成功了。

释放锁Unlock()方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
   func (m *Mutex) Unlock() {
        // Fast path: drop lock bit.
        new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
        if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
            panic("sync: unlock of unlocked mutex")
        }
    
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime.Semrelease(&m.sema)
                return
            }
            old = m.state
        }
    }

第 3 行是尝试将持有锁的标识设置为未加锁的状态,这是通过减 1 而不是将标志位置零的方式实现。第 4 到 6 行还会检测原来锁的状态是否已经未加锁的状态,如果是 Unlock 一个未加锁的 Mutex 会直接 panic。 第10-12行,如果阻塞在该锁上的goroutine数目为0或者mutex处于lock或者唤醒状态,则返回。 第13-17行,如果有等待者,并且没有唤醒的 waiter,那就需要唤醒一个等待的 waiter。在唤醒之前,需要将 waiter 数量减 1,并且将 mutexWoken 标志设置上,这样,Unlock 就可以返回了。

相对于初版的设计,这次的改动主要就是,新来的goroutine也有机会先获取到锁,甚至一个goroutine可能连续获取到锁,打破了先来先得的逻辑。

第三版

第三版的改动主要是:新来的goroutine或者是被唤醒的goroutine首次获取不到锁,它们就会通过自旋的方式,尝试检查锁是否被释放。在尝试一定的自旋次数后,再执行原来的逻辑。

第四版

第四版主要解决的问题是:解决饥饿。因为从第二版开始,就有新来的goroutine加入到抢占锁,所以有可能每次都会被新来的goroutine抢占到锁,在极端情况下,等待的goroutine有可能一直获取不到锁,这就是饥饿问题

由于代码过多,且目前能力不够,就先总结,留个坑,以后持续关注这个问题。

https://static001.geekbang.org/resource/image/e0/76/e0c23794c8a1d355a7a183400c036276.jpg

目前go关于mutex的设计是:让不公平的等待时间限制在1毫秒,state字段增加了mutexStarving(饥饿标记),这就意味着,一旦等待者等待的时间超过了这个阈值,Mutex 的处理就有可能进入饥饿模式,优先让等待者先获取到锁。

通过加入饥饿模式,可以避免把机会全都留给新来的 goroutine,保证了请求锁的 goroutine 获取锁的公平性,对于我们使用锁的业务代码来说,不会有业务一直等待锁不被处理。