概述
上一篇文章我们看go了互斥锁的具体实现。但是如果业务逻辑是读多写少,如果每次读写都使用互斥锁那么整个效率就会变得很低。其实如果只是读的话并不需要互斥锁来锁住数据。只有写操作的时候需要互斥锁,但是如果有人读那么写操作也应该被锁住。
在Go语言中提供了读写锁:RWMutex,并且提供了4个方法 读锁、读解锁、写锁、写解锁。其中读锁不是互斥,但是读锁和写锁是互斥的。简单来说是可以有多个读同时加锁,但是一旦有人想要获取写锁则会被阻塞。
简单使用
我们可以看到读锁可以获取多个,但是读锁还剩下一个的时候想要获取写锁则会被阻塞。等待3秒之后读锁被全部解开之后,会唤醒之前阻塞的写锁。别忘记最后需要解开写锁。还有一个比较常见的问题是,如果给没有读锁或者写锁的情况下解锁被抛出错误。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
rw := sync.RWMutex{}
rw.RLock()
rw.RLock()
rw.RLock()
rw.RUnlock()
rw.RUnlock()
go func() {
time.Sleep(time.Second * 3)
rw.RUnlock()
}()
fmt.Println("lock")
rw.Lock()
rw.Unlock()
fmt.Println("unlock")
}
源码分析
RWMutex实体
type RWMutex struct {
// 内部锁
w Mutex
// 写信号量
writerSem uint32
// 读信号量
readerSem uint32
// 准备读的goroutine的数量
readerCount int32
// 离开读的goroutine的数量
readerWait int32
}
// 读写锁最大数量 1073741824
const rwmutexMaxReaders = 1 << 30
RLock、RUnlock、Lock、
// 加读锁
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 使用原子操作增加读的数量操作readerCount + 1
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 如果小于0 则挂起goroutine等待readerSem
runtime_Semacquire(&rw.readerSem)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
// 解读锁
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// 设置readerCount - 1 记录返回结果r
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 如果r < 0 则报错 如果没有加锁的情况下解锁则会报错
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// readerWait数量-1
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 如果度等待等于0,则恢复写信号量的goroutine
runtime_Semrelease(&rw.writerSem, false)
}
}
if race.Enabled {
race.Enable()
}
}
// 写锁
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 第一步,先利用互斥锁 加锁
rw.w.Lock()
// 设置readerCount -1073741824
// 记录返回值r r再加上1073741824 获取读锁的数量
// 比如readerCount = 1 r = (1-1073741824) + 1073741824 = 1
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 判断读等待是否不等于0 如果不为0则阻塞等待
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Release(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// 记录并设置readerCount,使得readerCount为正数
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
// 未加锁
throw("sync: Unlock of unlocked RWMutex")
}
// 循环唤醒等待的读型号量的goroutine
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
用例子来分析源码
还是用上面的简单的例子看仔细看RWMutex中属性的变化。
下面代码可以看到主要的两个属性readerCount和readerWait两个属性的变化。
用最简单的总结一下:
- RLock: readerCount + 1,得到的结果readerCount < 0 此时有写锁,则挂起线程。
- RUnlock:readerCount – 1,得到结果readerCount < 0 则readerWait–, 如果readerWait(读等待)= 0 则唤醒写操作阻塞。
- Lock:readerCount – rwmutexMaxReaders(1073741824),得到结果再加上rwmutexMaxReaders获取等待数量存入readerWait中。如果读锁不为0 则阻塞写锁。
- Unlock:readerCount + rwmutexMaxReaders(1073741824),得到等待的读锁个数然后循环唤醒所有读等待的goroutine。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
rw := sync.RWMutex{}
rw.RLock() // readerCount = 1; readerWait = 0
rw.RLock() // readerCount = 2; readerWait = 0
rw.RLock() // readerCount = 3; readerWait = 0
rw.RUnlock() // readerCount = 2; readerWait = 0
rw.RUnlock() // readerCount = 1; readerWait = 0
go func() {
time.Sleep(time.Second * 3)
rw.RUnlock()
}()
fmt.Println("lock")
rw.Lock() // readerCount = -1073741824; readerWait = 0
rw.Unlock() // readerCount = 0; readerWait = 0
fmt.Println("unlock")
}
总结
互斥锁可以避免多线程中对同一个资源操作造成的问题,但是如果这个资源大部分情况下是读取少部分是写操作,则推荐使用读写锁来替换互斥锁。可以极大的提供效率,但是读写锁的操作比互斥锁多,有锁和写锁两种。如果操作不当很容易造成死锁。所以加锁和解锁必须要保证是成对出现,并且考虑如果报错的情况下如何保证解锁操作。