该文章主要讲解ReentrantReadWriteLock和ReentrantLock的实现以及源码分析
ReentrantLock/ReentrantReadWriteLock 本章简介
本文主要讲解ReentrantLock
和ReentrantReadWriteLock
锁实现.
ReentrantLock
如何实现的可重入锁?
ReentrantReadWriteLock
是如何设计的?
ReentrantReadWriteLock
读锁数量是如何记录的?
ReentrantReadWriteLock
中读锁和写锁的最大数量是多少?
Lock ReentrantLock
实现了Lock
接口.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public interface Lock { void lock () ; void lockInterruptibly () throws InterruptedException; boolean tryLock () ; boolean tryLock (long time, TimeUnit unit) throws InterruptedException; void unlock () ; Condition newCondition () ; }
Sync 该类为ReentrantLock
的内部类, 是一个抽象同步器,继承了AQS
, 其两个子类NonfairSync
和FairSync
分别代表了非公平锁
和公平锁
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock () ; final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } }
NonfairSync 非公平锁实现. 即获取锁时不会立即排队, 而是会先尝试获取锁.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static final class NonfairSync extends Sync { final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } }
FairSync 公平锁实现, 每次获取锁时都会排队, 知道其他线程释放锁并唤醒此线程才会继续抢锁.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 static final class FairSync extends Sync { final void lock () { acquire(1 ); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
通过对state
状态值的操作来判断锁是否获取, 且把state
的值当做锁重入的次数(大于0
).
构造函数 1 2 3 4 5 6 7 8 9 10 11 12 private final Sync sync;public ReentrantLock () { sync = new NonfairSync (); }public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
方法 1 2 3 4 5 6 7 8 9 10 11 public void lock () { sync.lock(); }public boolean tryLock () { return sync.nonfairTryAcquire(1 ); }public void unlock () { sync.release(1 ); }
可以看到大部分方法都是调用的Sync
类中的方法.
ReentrantReadWriteLock 介绍 读写锁
ReadWriteLock 此接口为ReentrantReadWriteLock
实现的接口. 定义了获取读锁
和写锁
方法.
1 2 3 4 5 6 7 public interface ReadWriteLock { Lock readLock () ; Lock writeLock () ; }
构造器 1 2 3 4 5 6 7 8 9 10 11 public ReentrantReadWriteLock () { this (false ); }public ReentrantReadWriteLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); readerLock = new ReadLock (this ); writerLock = new WriteLock (this ); }
内部类图
可以看到ReentrantReadWriteLock
也有公平
和非公平
的两种模式.
FairSync/NonfairSync
可以看出公平
和非公平
主要是针对于写锁
. 两个子类没有具体的获取锁和释放锁的代码, 全部都在Sync中.
WriteLock/ReadLock 写锁代码
读锁代码
从代码中可以看出, 读锁和写锁的加锁和释放锁的操作都是调用的sync中的方法来完成的. 所以我们重点关注Sync
类的实现.
Sync 抽象同步器, 封装了获取/释放读锁和写锁相关的代码.
核心变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static final int SHARED_SHIFT = 16 ;static final int SHARED_UNIT = (1 << SHARED_SHIFT);static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1 ;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 ;private transient ThreadLocalHoldCounter readHolds;private transient HoldCounter cachedHoldCounter;private transient Thread firstReader = null ;private transient int firstReaderHoldCount;
从SHARED_UNIT
和EXCLUSIVE_MASK
等变量可以看出, 同步器使用了高16位
来存储读锁数量, 低16位
来存储写锁数量. 且每个读线程
持有的读锁数量
有各自线程进行管理.
构造函数 1 2 3 4 Sync() { readHolds = new ThreadLocalHoldCounter (); setState(getState()); }
HoldCounter原理 该类的实例被每个线程持有, 由ThreadLocal来绑定.
1 2 3 4 5 6 static final class HoldCounter { final long tid = getThreadId(Thread.currentThread()); int count = 0 ; }
ThreadLocalHoldCounter原理 ThreadLocal子类, 用于初始化与线程绑定的HoldCounter
.
1 2 3 4 5 6 7 static final class ThreadLocalHoldCounter extends ThreadLocal <HoldCounter> { public HoldCounter initialValue () { return new HoldCounter (); } }
此类在Sync类初始化时被创建.
tryRelease方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; }
此方法用于写线程释放锁. 只有当重入的次数为0, 才会将持有锁线程置为空.
tryAcquire方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
tryWriteLock方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final boolean tryWriteLock () { Thread current = Thread.currentThread(); int c = getState(); if (c != 0 ) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); } if (!compareAndSetState(c, c + 1 )) return false ; setExclusiveOwnerThread(current); return true ; }
此方法和tryAcquire
方法差不多, 只不过少了writerShouldBlock
方法的调用. 由于其通常被tryLock
方法调用. 因此调用线程不用阻塞.
tryAcquireShared方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); }
fullTryAcquireShared方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null ) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0 ) readHolds.remove(); } } if (rh.count == 0 ) return -1 ; } } if (sharedCount(c) == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null ) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1 ; } } }
从代码中可以看出此方法和tryAcquireShared
方法代码类似, 只不过fullTryAcquireShared
方法开启了一个循环, 不断的去获取锁.
可以将tryAcquireShared
方法看做读锁的优化操作. 类似于提前尝试一下, 不行在开启循环.
tryReadLock方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 final boolean tryReadLock () { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false ; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return true ; } } }
tryReadLock
方法相较于fullTryAcquireShared
方法只是少了readerShouldBlock
调用来判断当前读线程是都应该被阻塞; 由于此方法是在tryLock
中调用, 调用线程不应该阻塞. 没有锁直接返回即可, 不用阻塞
tryReleaseShared方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
总结 ReentrantReadWriteLock
巧妙的使用了int
变量32位的设计, 将其拆分成两个16位, 低16位存储写锁, 高16位存储读锁.
由于写锁是互斥的, 所以不用记录. 但是读锁是共享的, 所以需要知道线程获取读锁的次数.
使用HoldCounter
来记录每个线程获取读锁的次数, 且此类实例与获取读锁的线程绑定. 通过ThreadLocalHoldCounter
对象实现.
读锁每次操作状态变量 都是以SHARED_UNIT
变量为单位来进行操作.