该文章主要讲解在向AQS的源码解析
AbstractQueuedSynchronizer 本章简介
本章主要讲述 JUC 包下的 AQS 的设计与现实, 同时了解 AQS 中独占和共享模式的运转原理和机制
AQS 的设计和实现
AQS 中独占和共享模式的源代码分析
AbstractOwnableSynchronizer 该类为AbstractQueuedSynchronizer
的父类, 其中包含了一个核心的字段:
1 private transient Thread exclusiveOwnerThread;
此字段用于标识独占模式下
当前持有锁的线程.
AbstractQueuedSynchronizer.Node 在AQS中队列的节点由内部类Node
来表示. 队列则是由一个个Node节点来形成的. 而从Node自身的变量可以知道当前节点的上一下节点(pre)和一下个节点(next). 同时每个节目都含有状态(唤醒时使用).
由于需要知道具体抢锁的线程, 所以Node包含了Thread
的引用.
变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException (); else return p; } }
通过源码知道了每个节点的状态有四个取值分别是: SIGNAL, CANCELLED, CONDITION, PROPAGATE
.
其中需要注意CANCELLED
的取值为正数(大于0), 其他的状态值都为负数. 后续此条件会经常用到. 需要特殊注意.
注意: waitStatus 默认初始化值为0.
构造函数 1 2 3 4 5 6 7 8 9 10 11 Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; }
核心变量 1 2 3 4 5 private transient volatile Node head;private transient volatile Node tail;
AQS可以通过head, tail
变量来操作队列中的元素. 此时队列已经完善了, 那么使用什么来表示锁呢?
AQS使用了一个int
类型的字段来表示锁
1 private volatile int state;
AQS中所有的操作都是围绕这个字段来展开的, 线程通过CAS来不断的修改这个值, 来模拟抢锁(修改成功则代表抢锁成功, 反之则抢锁失败).
核心方法 AQS的核心方法包括:
acquire(int arg):
独占模式下获取锁.
acquireInterruptibly(int arg):
独占模式下可中断获取锁, 当线程中断时直接抛出中断异常.
release(int arg):
独占模式下释放锁
acquireShared(int arg):
共享模式下获取锁
acquireSharedInterruptibly(int arg):
共享模式下可中断获取锁
releaseShared(int arg):
共享模式下释放锁
acquire方法原理 1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire
方法有具体的子类实现, 返回值代表抢锁是否成功: true
表示抢锁成功, false
表示抢锁失败.
当抢锁失败时, 会调用addWaiter
方法来创建Node节点并入队. 注意Node.EXCLUSIVE
是个空对象. 当入队完成时可能有其他线程释放锁了, 所以调用acquireQueued
开启循环不断去获取锁.
acquireQueued方法原理 该方法就是开启一个循环, 不断的去获取锁同时判断前置节目的状态.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
注意此时循环的结束条件: 只有当前节点获取锁成功才会结束循环. 即p == head && tryAcquire(arg)条件成立
.
shouldParkAfterFailedAcquire
方法就是根据前置节点的状态来判断当前线程是否应该被阻塞.
而parkAndCheckInterrupt
方法就是阻塞当前线程并检查线程中断标志位. 底层调用LockSupport.park
方法.
addWaiter方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
此方法比较简单, 就是创建Node
节点并通过cas修改tail节点
. 如果cas修改失败
或者队列为初始化
, 则调用enq
方法开启循环进行队列初始化和cas
修改tail
节点.
enq方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
可以看出此方法的作用有两个: 1. 初始化队列头(head) 和 尾(tail); 2.将指定节点添加到尾部(即tail)
循环的终止条件为: 通过cas将tail节点修改成当前节点成功才会结束.
release方法原理 1 2 3 4 5 6 7 8 9 10 11 12 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
acquireShared方法原理 1 2 3 4 5 6 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
doAcquireShared方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
当执行此方法时, 存在的场景如下:
成功获取到锁, 执行完setHeadAndPropagate
返回.
继续循环, 不断调用shouldParkAfterFailedAcquire
方法修改前置节目的状态(修改成SIGNAL).
已经调用parkAndCheckInterrupt
阻塞.
setHeadAndPropagate方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
由于releaseShared
方法最终调用的也是doReleaseShared
方法, 所以将其解析放在下面.
releaseShared方法原理 1 2 3 4 5 6 7 8 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
doReleaseShared方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void doReleaseShared () { for (; ; ) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) { continue ; } unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) { continue ; } } if (h == head) break ; } }
看完此方法可能有点疑惑, 举个例子: 此时有 A, B, C三个线程, 只有一把共享锁, 假设此时B和C在队列中排队等待A释放锁.
此时队列如下:head -> B(blocked) -> C(blocked)
/ A -- lock
当A线程调用doReleaseShared
方法时, 此时head的状态必然是SIGNAL
, 所以会cas修改状态并唤醒B线程.
注意此时会出现两种情况:
A线程执行到h == head
时, head还未更改, A线程结束循环, 完成释放是操作. 情况如下:
出现新的D线程抢走了锁
B线程抢到了锁,但是还未执行setHead
操作即更新头结点.
A线程执行到h == head
时, head已经更改, A线程继续循环. 情况如下:
B线程执行完了setHead
操作
当B线程执行setHeadAndPropagate
方法后最终进入doReleaseShared
方法, 同A线程一样执行起了相同的唤醒操作.
此时唤醒的线程有A和B两个线程, 注意此时B还未释放锁. 当C线程被唤醒后, 尝试去获取锁, 结果失败. head节点不会更改, 此时h == head
成立, A线程和B线程结束循环. 退出释放的操作.
后续B线程调用releaseShared
方法释放锁,则和A线程当初一样. 继续循环唤醒.
简单来说就是: 只要有线程释放锁或获取到锁, 此线程都会加入唤醒后续线程的队伍
. 直到后续线程唤醒后无法抢锁. 导致head不更新, 队伍
中的唤醒线程才会停止唤醒操作.
shouldParkAfterFailedAcquire方法原理 此方法无论是在独享锁或共享锁的情况下都会有调用, 且都是在获取锁失败的情况下.
该方法主要是判断前置节点去状态来对当前节点进行操作: 1. 继续循环获取锁 2. 直接阻塞. 由返回结果标识.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
可以看出此方法做了两件事:
清理队列中的取消节点
设置前置节点的状态为SIGNAL
, 以表示当前节点可以被安全的阻塞
此方法仅当前置节点的状态为SIGNAL
返回true
, 其余情况返回false
.
unparkSuccessor方法原理 注意此方法的调用有多处: 1. 独享锁的释放release
2. 共享锁的获取或释放releaseShared
3. 取消节点
此方法根据传入的node节点, 唤醒node的后继节点.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
cancelAcquire方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ){ node.prev = pred = pred.prev; } Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
ConditionObject Condition Condition是ConditionObject实现的接口, 定义了ConditionObject包含了那些操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public interface Condition { void await () throws InterruptedException; void awaitUninterruptibly () ; long awaitNanos (long nanosTimeout) throws InterruptedException; boolean await (long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil (Date deadline) throws InterruptedException; void signal () ; void signalAll () ; }
设计 在ConditionObject内部其实也是使用的队列来管理等待的线程. 它和AQS使用的队列隔离.
变量 1 2 3 4 5 6 7 8 private transient Node firstWaiter;private transient Node lastWaiter;private static final int REINTERRUPT = 1 ;private static final int THROW_IE = -1 ;
可以看到ConditionObject
也使用了Node
内部类来封装节点.
由于AQS中也存在队列所以我们称之为: 同步队列
. 而ConditionObject也存在队列所以称为: 条件等待队列
await方法原理 调用此方法时, 线程必须持有锁.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
addConditionWaiter方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
从这个方法可以看出 条件等待队列
是一个单向
的链表, 只能从头向尾
遍历.
signal方法原理 1 2 3 4 5 6 7 8 9 10 11 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); }
从signal
方法可以看出使用ConditionObject
必须要在独占模式下才能使用.
doSignal方法原理 1 2 3 4 5 6 7 8 9 10 11 12 private void doSignal (Node first) { do { if ((firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); }
signalAll方法原理 1 2 3 4 5 6 7 8 9 10 public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignalAll(first); }
doSignalAll方法原理 1 2 3 4 5 6 7 8 9 10 11 12 private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); }
transferForSignal方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
如果p
节点如果被阻塞, 那么cas必然是能成功的, 只有p
节点是活动的(可能在获取锁 或者 刚好被唤醒), 才会导致cas修改失败.
此时node
节点有可能获取到锁, 所以需要唤醒node节点的线程.