type RWMutex struct { n w Mutex // held if there are pending writersn writerSem uint32 // semaphore for writers to wait for completing readersn readerSem uint32 // semaphore for readers to wait for completing writersn readerCount int32 // number of pending readersn readerWait int32 // number of departing readersn}nn// RWMutex提供了以下几个方法n// 加读锁nfunc (rw *RWMutex) RLock() { }n// 解读锁nfunc (rw *RWMutex) RUnlock() { }n// 尝试加读锁nfunc (rw *RWMutex) TryRLock() bool { }n// 加写锁nfunc (rw *RWMutex) Lock() { }n// 解写锁nfunc (rw *RWMutex) Unlock() { }n// 尝试加写锁nfunc (rw *RWMutex) TryLock() bool { }n// 返回一个Locker接口nfunc (rw *RWMutex) RLocker() Locker { }
使用RWMutex进行读写锁演示代码:
func TestRWMutexLock(t *testing.T) { n var rw sync.RWMutexn var wg sync.WaitGroupn for i := 0; i < 5; i++ { n go func() { n wg.Add(1)n defer wg.Done()n // 读锁n rw.RLock()n defer rw.RUnlock()n time.Sleep(1 * time.Second)n fmt.Println("读操作")n }()n }nn for i := 0; i < 5; i++ { n go func() { n wg.Add(1)n defer wg.Done()n // 写锁n rw.Lock()n defer rw.Unlock()n time.Sleep(1 * time.Second)n fmt.Println("写操作")n }()n }n wg.Wait()n}
底层原理
字段含义
const rwmutexMaxReaders = 1 << 30nntype RWMutex struct { n w Mutex // held if there are pending writersn writerSem uint32 // semaphore for writers to wait for completing readersn readerSem uint32 // semaphore for readers to wait for completing writersn readerCount int32 // number of pending readersn readerWait int32 // number of departing readersn}
func (rw *RWMutex) Unlock() { n if race.Enabled { n _ = rw.w.staten race.Release(unsafe.Pointer(&rw.readerSem))n race.Disable()n }n // 将readerCount更新为正数
,表示当前没有写操作n r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)n if r >= rwmutexMaxReaders { n race.Enable()n throw("sync: Unlock of unlocked RWMutex")n }n // 唤醒所有等待的读操作n for i := 0; i < int(r); i++ { n runtime_Semrelease(&rw.readerSem, false, 0)n }n // 释放锁n rw.w.Unlock()n if race.Enabled { n race.Enable()n }n}
将readerCount更新为正数,表示当前没有写操作
若存在等待的读操作,则唤醒所有等待的读操作
释放互斥锁
RLock
func (rw *RWMutex) RLock() { n if race.Enabled { n _ = rw.w.staten race.Disable()n }n // 原子更新readerCount+1,表示读操作数量+1n // 若readerCount+1为负数
,表示当前存在写操作,读操作会被阻塞,等待写操作完成后被唤醒n if atomic.AddInt32(&rw.readerCount, 1) < 0 { n runtime_SemacquireMutex(&rw.readerSem, false, 0)n }n if race.Enabled { n race.Enable()n race.Acquire(unsafe.Pointer(&rw.readerSem))n }n}
func (rw *RWMutex) RUnlock() { n if race.Enabled { n _ = rw.w.staten race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))n race.Disable()n }n // 原子更新readerCount-1,表示读操作数量-1n // 若readerCount-1为负数
,表示当前读操作阻塞了写操作,需要进行额外处理n if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { n // Outlined slow-path to allow the fast-path to be inlinedn rw.rUnlockSlow(r)n }n if race.Enabled { n race.Enable()n }n}nnfunc (rw *RWMutex) rUnlockSlow(r int32) { n if r+1 == 0 || r+1 == -rwmutexMaxReaders { n race.Enable()n throw("sync: RUnlock of unlocked RWMutex")n }n // 原子更新readerWait-1,表示阻塞写操作的读操作数量-1n // 当readerWait-1为0时 ,表示导致写操作阻塞的所有读操作都已经执行完成,此时需要把阻塞的写操作唤醒n if atomic.AddInt32(&rw.readerWait, -1) == 0 { n runtime_Semrelease(&rw.writerSem, false, 1)n }n}