• 设为首页
  • 收藏本站
  • 积分充值
  • VIP赞助
  • 手机版
  • 微博
  • 微信
    微信公众号 添加方式:
    1:搜索微信号(888888
    2:扫描左侧二维码
  • 快捷导航
    福建二哥 门户 查看主题

    golang中单机锁的具体实现详解

    发布者: 皮3591 | 发布时间: 2025-8-14 10:30| 查看数: 32| 评论数: 0|帖子模式

    1、锁的概念引入

    首先,为什么需要锁?
    在并发编程中,多个线程或进程可能同时访问和修改同一个共享资源(例如变量、数据结构、文件)等,若不引入合适的同步机制,会引发以下问题:

    • 数据竞争:多个线程同时修改一个资源,最终的结果跟线程的执行顺序有关,结果是不可预测的。
    • 数据不一致:一个线程在修改资源,而另一个线程读取了未修改完的数据,从而导致读取了错误的数据。
    • 资源竞争:多线程竞争同一个资源,浪费系统的性能。
    因此,我们需要一把锁,来保证同一时间只有一个人能写数据,确保共享资源在并发访问下的正确性和一致性。
    在这里,引入两种常见的并发控制处理机制,即乐观锁与悲观锁:
    乐观锁:假定在并发操作中,资源的抢占并不是很激烈,数据被修改的可能性不是很大,那这时候就不需要对共享资源区进行加锁再操作,而是先修改了数据,最终来判断数据有没有被修改,没有被修改则提交修改指,否则重试。
    悲观锁:与乐观锁相反,它假设场景的资源竞争激烈,对共享资源区的访问必须要求持有锁。
    针对不同的场景需要采取因地制宜的策略,比较乐观锁与悲观所,它们的优缺点显而易见:
    策略优点缺点乐观锁不需要实际上锁,性能高若冲突时,需要重新进行操作,多次重试可能会导致性能下降明显悲观锁访问数据一定需要持有锁,保证并发场景下的数据正确性加锁期间,其他等待锁的线程需要被阻塞,性能低
    2、Sync.Mutex

    Go对单机锁的实现,考虑了实际环境中协程对资源竞争程度的变化,制定了一套锁升级的过程。具体方案如下:

    • 首先采取乐观的态度,Goroutine会保持自旋态,通过CAS操作尝试获取锁。
    • 当多次获取失败,将会由乐观态度转入悲观态度,判定当前并发资源竞争程度剧烈,进入阻塞态等待被唤醒。
    从乐观转向悲观的判定规则如下,满足其中之一即发生转变:

    • Goroutine自旋尝试次数超过4次
    • 当前P的执行队列中存在等待被执行的G(避免自旋影响GMP调度性能)
    • CPU是单核的(其他Goroutine执行不了,自旋无意义)
    除此之外,为了防止被阻塞的协程等待过长时间也没有获取到锁,导致用户的整体体验下降,引入了饥饿的概念:

    • 饥饿态:若Goroutine被阻塞等待的时间>1ms,则这个协程被视为处于饥饿状态
    • 饥饿模式:表示当前锁是否处于特定的模式,在该模式下,锁的交接是公平的,按顺序交给等待最久的协程。
    饥饿模式与正常模式的转变规则如下:
    普通模式->饥饿模式:存在阻塞的协程,阻塞时间超过1ms
    饥饿模式->普通模式:阻塞队列清空,亦或者获得锁的协程的等待时间小于1ms,则恢复
    接下来步入源码,观看具体的实现。

    2.1、数据结构

    位于包sync/mutex.go中,对锁的定义如下:
    1. type Mutex struct {
    2.     state int32
    3.     sema  uint32
    4. }
    复制代码

    • state:标识目前锁的状态信息,包括了是否处于饥饿模式、是否存在唤醒的阻塞协程、是否上锁、以及处于等待锁的协程个数有多少。
    • seme:用于阻塞和唤醒协程的信号量。
    将state看作一个二进制字符串,它存储信息的规则如下:

    • 第一位标识是否处于上锁,0表示否,1表示上锁(mutexLocked)
    • 第二位标识是否存在唤醒的阻塞协程(mutexWoken)
    • 第三位标识是否处于饥饿模式(mutexStarving)
    • 从第四位开始,记录了处于阻塞态的协程个数
    1. const (
    2.     mutexLocked = 1 << iota // mutex is locked
    3.     mutexWoken
    4.     mutexStarving
    5.     mutexWaiterShift = iota
    6.     starvationThresholdNs = 1e6 //饥饿阈值
    7. )
    复制代码
    2.2、获取锁Lock()
    1. func (m *Mutex) Lock() {
    2.     if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    3.         return
    4.     }
    5.     m.lockSlow()
    6. }
    复制代码
    尝试直接通过CAS操作直接获取锁,若成功则返回,否则说明锁被获取,步入LockSlow。

    2.3、LockSlow()

    源码较长,进行拆分讲解:
    1. var waitStartTime int64
    2.     starving := false
    3.     awoke := false
    4.     iter := 0
    5.     old := m.state
    复制代码
    (1)定义了基本的常量,含义如下:

    • waitStartTime:记录当前协程等待的时间,只有被阻塞才会使用
    • awoke:标识当前协程是否被Unlock唤醒
    • iter:记录当前协程自旋尝试次数
    • old:记录旧的锁的状态信息
    1. for {
    2.         //处于上锁状态,并且不处于饥饿状态中,并且当前的协程允许继续自旋下去
    3.         if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {

    4.             if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    5.                 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    6.                 awoke = true
    7.             }
    8.             runtime_doSpin()
    9.             iter++
    10.             old = m.state
    11.             continue
    12.         }
    13.         //...
    14.     }
    复制代码
    (2)进入尝试获取锁的循环中,两个if表示:

    • 若锁处于上锁状态,并且不处于饥饿状态中,并且当前的协程允许继续自旋下去(非单核CPU、自旋次数<=4、调度器P的本地队列不存在等待执行的G),则步入:

      • 若当前协程并非从等待队列唤醒、并且不存在被唤醒的等待协程、并且存在位于阻塞的协程、则尝试设置mutexWoken标识为1,若成功:

        • 标识当前的协程为被唤醒的协程。(虽然并非实际从阻塞中唤醒)

      • 告诉P,当前的协程处于自旋态
      • 更新
        1. iter
        复制代码
        计数器,与
        1. old
        复制代码
        记录的当前锁的状态信息,进行下一次重试循环

    这里存在的唯一疑惑为,为什么要将awoke标识为true?
    首先,因为当前锁并非处于饥饿模式,因此当前的抢占锁的模式是不公平的,若当前锁的阻塞队列还没有被唤醒的协程,那就要求不要唤醒了,尝试让当前正在尝试的协程获取到锁,避免唤醒协程进行资源竞争。
    1. for {
    2.         //...
    3.         new := old
    4.         if old&mutexStarving == 0 {
    5.             new |= mutexLocked
    6.         }
    7.         if old&(mutexLocked|mutexStarving) != 0 {
    8.             new += 1 << mutexWaiterShift
    9.         }
    10.         if starving && old&mutexLocked != 0 {
    11.             new |= mutexStarving
    12.         }
    13.         if awoke {
    14.             new &^= mutexWoken
    15.         }
    16.         //...
    17. }        
    复制代码
    (3)进行状态更新:
    当协程从步骤2走出来时,只能说明它位于以下两个状态之一:

    • 旋不动了,或者锁进入饥饿模式了,锁要让给别人了,总之是获取不到锁了(悲观)。
    • 锁被释放了。
    不论如何,都需要进行一些状态的更新,为接下来的打算做准备。
    用new存储一个锁即将要进入的新状态信息,更新规则:

    • 若锁不处于饥饿模式:说明锁可能被释放了,也可能是自旋次数过多,不管接下来是否能拿到锁,锁都会被某一个协程获取,因此置
      1. mutexLocked
      复制代码
      为1。
    • 若锁可能处于饥饿状态,或者锁没有被释放:那说明自己是抢不到锁了,即将进入阻塞态,阻塞协程计数器+1。
    • 若当前的协程是被唤醒的,并且已经处在饥饿态中而且锁仍然锁着:锁进入绝对公平的饥饿模式
    • 若当前协程是被唤醒的:清除
      1. mutexWoken
      复制代码
      标识位,因为接下来可能需要有协程被唤醒(饥饿模式)。
    虽然更新的有点多,但是可以归纳为

    • 若锁释放了,那就标识一下接下来锁要被获取即可。
    • 若锁没有释放,并给当前协程等待了很久,那锁就进入饥饿状态,接下来需要有阻塞协程被唤醒。
    (4)尝试更新信息:
    1. if atomic.CompareAndSwapInt32(&m.state, old, new) {
    2.             //...
    3.         } else {
    4.             old = m.state
    5.         }
    复制代码
    接下来尝试将new更新进state,若更新失败,说明当前有另一个协程介入了,为了防止数据的一致性丢失,要全部重来一次。
    (5)状态更新成功,具体判断是要沉睡还是获取锁成功:
    步入步骤4的if主支中,此时有两个状态:
    1. if atomic.CompareAndSwapInt32(&m.state, old, new) {
    2.             if old&(mutexLocked|mutexStarving) == 0 {
    3.                 break // locked the mutex with CAS
    4.             }
    5.     //...
    6.         } else {
    7.             //...
    8.         }
    复制代码
    因为当前状态,可能是锁释放了,检查锁更新前是否已经被释放了并且不是饥饿模式,若是那说明获取锁成功了,函数结束了。
    1. if atomic.CompareAndSwapInt32(&m.state, old, new) {
    2.             if old&(mutexLocked|mutexStarving) == 0 {
    3.                 break // locked the mutex with CAS
    4.             }
    5.             // If we were already waiting before, queue at the front of the queue.
    6.             queueLifo := waitStartTime != 0
    7.             if waitStartTime == 0 {
    8.                 waitStartTime = runtime_nanotime()
    9.             }
    10.             runtime_SemacquireMutex(&m.sema, queueLifo, 2)
    11.     //....
    12.         } else {
    13.             //...
    14.         }
    复制代码
    否则,说明当前协程要进入阻塞态了,记录一下开始阻塞的时间,用于醒来是判断是否饥饿。然后进入阻塞沉睡中。
    (6)若步骤5进入阻塞,则被唤醒后:
    1. if atomic.CompareAndSwapInt32(&m.state, old, new) {
    2.             if old&(mutexLocked|mutexStarving) == 0 {
    3.                 break // locked the mutex with CAS
    4.             }
    5.             // If we were already waiting before, queue at the front of the queue.
    6.             queueLifo := waitStartTime != 0
    7.             if waitStartTime == 0 {
    8.                 waitStartTime = runtime_nanotime()
    9.             }
    10.             runtime_SemacquireMutex(&m.sema, queueLifo, 2)
    11.                 //唤醒
    12.               starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    13.             old = m.state
    14.     //若锁处于饥饿模式
    15.             if old&mutexStarving != 0 {
    16.                 //锁的异常处理
    17.                 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
    18.                     throw("sync: inconsistent mutex state")
    19.                 }
    20.                 //将要更新的信号量
    21.                 delta := int32(mutexLocked - 1<<mutexWaiterShift)
    22.                 if !starving || old>>mutexWaiterShift == 1 {
    23.                     delta -= mutexStarving
    24.                 }
    25.                 atomic.AddInt32(&m.state, delta)
    26.                 break
    27.             }
    28.             awoke = true
    29.             iter = 0
    30.    
    31.     //....
    32.         } else {
    33.             //...
    34.         }
    复制代码
    从阻塞中唤醒,首先计算一些协程的阻塞时间,以及当前的最新锁状态。
    若锁处于饥饿模式:那么当前协程将直接获取锁,当前协程是因为饥饿模式被唤醒的,不存在其他协程抢占锁。于是更新信号量,将记录阻塞协程数-1,将锁的上锁态置1。若当前从饥饿模式唤醒的协程,等待时间已经不到1ms了或者是最后一个等待的协程,那么将将锁从饥饿模式转化为正常模式。至此,获取成功,退出函数。
    否则,只是普通的随机唤醒,于是开始尝试进行抢占,回到步骤1。

    2.4、释放锁Unlock()
    1. func (m *Mutex) Unlock() {
    2.     //直接释放锁
    3.     new := atomic.AddInt32(&m.state, -mutexLocked)
    4.     if new != 0 {
    5.         m.unlockSlow(new)
    6.     }
    7. }
    复制代码
    通过原子操作,直接将锁的mutexLocked标识置为0。若置0后,锁的状态不为0,那就说明存在需要获取锁的协程,步入unlockSlow。

    2.5、unlockSlow()
    1. func (m *Mutex) unlockSlow(new int32) {
    2.     if (new+mutexLocked)&mutexLocked == 0 {
    3.         fatal("sync: unlock of unlocked mutex")
    4.     }
    5.     if new&mutexStarving == 0 {
    6.         old := new
    7.         for {
    8.             if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    9.                 return
    10.             }
    11.             new = (old - 1<<mutexWaiterShift) | mutexWoken
    12.             if atomic.CompareAndSwapInt32(&m.state, old, new) {
    13.                 runtime_Semrelease(&m.sema, false, 2)
    14.                 return
    15.             }
    16.             old = m.state
    17.         }
    18.     } else {
    19.         runtime_Semrelease(&m.sema, true, 2)
    20.     }
    21. }
    复制代码
    (1)首先进行了异常状态处理,若释放了一个已经释放了到锁,那么直接fatal,程序终止。
    1. if (new+mutexLocked)&mutexLocked == 0 {
    2.         fatal("sync: unlock of unlocked mutex")
    3.     }
    复制代码
    (2)若锁不处于饥饿状态:

    • 若此时的等待协程数量为0,或者锁被上锁了、含有被唤醒的协程、锁处于饥饿模式:都说明有新的协程介入了流程,已经完成了交接,可以直接退出
    • 唤醒一个处于阻塞态的协程。
    否则,处于饥饿状态,唤醒等待最久的协程。

    3、Sync.RWMutex

    对于共享资源区的操作,可以划分为读与写两大类。假设在一个场景中,对共享资源区继续读的操作远大于写的操作,如果每个协程的读操作都需要获取互斥锁,这带来的性能损耗是非常大的。
    RWMutex是一个可以运用在读操作>写操作中的提高性能的锁,可以将它视为由一个读锁与一个写锁构成。其运作规则具体如下:

    • 读锁允许多个读协程同时读取共享资源区,若有协程需要修改资源区的数据,那么它需要被阻塞。
    • 写锁具有严格的排他性,当共享资源区被上了写锁时,任何其他goroutine都不得访问。
    • 可见在最坏的情况下,所有的协程都是需要写操作时,读写锁会退化成普通的Mutex。

    3.1、数据结构
    1. type RWMutex struct {
    2.     w           Mutex        // held if there are pending writers
    3.     writerSem   uint32       // semaphore for writers to wait for completing readers
    4.     readerSem   uint32       // semaphore for readers to wait for completing writers
    5.     readerCount atomic.Int32 // number of pending readers
    6.     readerWait  atomic.Int32 // number of departing readers
    7. }
    8. const rwmutexMaxReaders = 1 << 30 //最大的读协程数量
    复制代码

    • w:一个互斥的写锁
    • writerSem:关联被阻塞的写协程的信号量
    • readerSem:关联被阻塞的读协程的信号量
    • readerCount:正常情况下,记录正在读取的协程数量;但若当前是写协程正在持有锁,那么实际记录读协程的数量为readerCount - rwmutexMaxReader
    • readerWait:记录释放下一个写协程,还需要等待读协程完成的数量

    3.2、读锁流程RLock()
    1. func (rw *RWMutex) RLock() {
    2.     if rw.readerCount.Add(1) < 0 {
    3.         // A writer is pending, wait for it.
    4.         runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
    5.     }
    6. }
    复制代码
    对readerCount+1,表示新加入一个读协程。若结果<0,说明当前锁正在被写协程占据,令当前的读协程阻塞。

    3.3、读释放锁流程RUnlock()
    1. func (rw *RWMutex) RUnlock() {
    2.     if r := rw.readerCount.Add(-1); r < 0 {
    3.         // Outlined slow-path to allow the fast-path to be inlined
    4.         rw.rUnlockSlow(r)
    5.     }
    6. }
    复制代码
    对readerCount-1,表示减少一个读协程。若结果<0,说明当前锁正在被写协程占据,步入runlockslow。

    3.4、rUnlockSlow()
    1. func (rw *RWMutex) rUnlockSlow(r int32) {
    2.     if r+1 == 0 || r+1 == -rwmutexMaxReaders {
    3.         race.Enable()
    4.         fatal("sync: RUnlock of unlocked RWMutex")
    5.     }
    6.     if rw.readerWait.Add(-1) == 0 {
    7.         // The last reader unblocks the writer.
    8.         runtime_Semrelease(&rw.writerSem, false, 1)
    9.     }
    10. }
    复制代码
    首先进行错误处理,若发现当前协程为占用过读锁,或者读流程的协程数量上限,系统出现异常,fatal。
    否则,对readerWait-1,若结果为0,说明当前协程是最后一个介入读锁流程的协程,此时需要释放一个写锁。

    3.5、写锁流程Lock()
    1. func (rw *RWMutex) Lock() {
    2.     // First, resolve competition with other writers.
    3.     rw.w.Lock()
    4.     // Announce to readers there is a pending writer.
    5.     r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
    6.     // Wait for active readers.
    7.     if r != 0 && rw.readerWait.Add(r) != 0 {
    8.         runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
    9.     }
    10. }
    复制代码
    首先尝试获取写锁,若获取成功,需要将readerCount-最大读协程数,表示现在锁被读协程占据。
    r表示处于读流程的协程数量,若r不为0,那么就将readerWait加上r,等这些读协程都读取完毕,再去写。将这个写协程阻塞。(读写锁并非读、写公平,读协程优先。)

    3.6、写释放锁流程Unlock()
    1. func (rw *RWMutex) Unlock() {
    2.     // Announce to readers there is no active writer.
    3.     r := rw.readerCount.Add(rwmutexMaxReaders)
    4.     if r >= rwmutexMaxReaders {
    5.         race.Enable()
    6.         fatal("sync: Unlock of unlocked RWMutex")
    7.     }
    8.     // Unblock blocked readers, if any.
    9.     for i := 0; i < int(r); i++ {
    10.         runtime_Semrelease(&rw.readerSem, false, 0)
    11.     }
    12.     // Allow other writers to proceed.
    13.     rw.w.Unlock()
    14. }
    复制代码
    重新将readerCount置为正常指,表示释放了写锁。若读协程超过最大上限,则异常。
    然后唤醒所有阻塞的读协程。(读协程优先)
    解锁。
    以上就是golang中单机锁的具体实现详解的详细内容,更多关于go单机锁的资料请关注脚本之家其

    来源:互联网
    免责声明:如果侵犯了您的权益,请联系站长(1277306191@qq.com),我们会及时删除侵权内容,谢谢合作!

    最新评论

    QQ Archiver 手机版 小黑屋 福建二哥 ( 闽ICP备2022004717号|闽公网安备35052402000345号 )

    Powered by Discuz! X3.5 © 2001-2023

    快速回复 返回顶部 返回列表