该文章主要讲述 Java 中 Juc 包下的 AbstractQueuedSynchronizer 主要讲述 AQS 的设计及其独占模式和共享模式的获取和释放流程
本章简介
本章主要讲述 JUC 包下的 AQS 的设计与现实, 同时了解 AQS 中独占和共享模式的运转原理和机制
AQS 的设计和实现
AQS 中队列介绍及其变体
AQS 中独占和共享模式的源代码分析
AbstractOwnableSynchronizer 此类为AbstractQueuedSynchronizer的父类, 一个同步器框架有可能在一个时刻被某一个线程独占,AbstractOwnableSynchronizer
就是为所有的同步器实现和锁相关实现提供了基础的保存、获取和设置独占线程的功能.
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public abstract class AbstractOwnableSynchronizer implements java .io.Serializable { private static final long serialVersionUID = 3737899427754241961L ; protected AbstractOwnableSynchronizer () { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread (Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread () { return exclusiveOwnerThread; } }
AbstractQueuedSynchronizer
Juc 包下的多数同步器都是基于AbstractQueuedSynchronizer(简称 AQS)
框架实现的,AQS为同步状态
的原子性管理
、线程的阻塞
和解除阻塞
以及排队
提供了一种通用的机制。
功能 同步器一般包含两种方法,一种是acquire
,另一种是release
。acquire操作阻塞调用的线程,直到或除非同步状态允许其继续执行。而release操作则是通过某种方式改变同步状态,使得一或多个被acquire阻塞的线程继续执行。
但是 Juc 包中不同的同步器其相应的 API 页不相,Lock.lock,Semaphore.acquire,CountDownLatch.await
(但本质都是acquire
或release
操作),但都支持下面的操作:
阻塞和非阻塞同步
可选的超时设置,让调用者可以放弃等待
通过中断实现任务取消,通常分为两个版本,一个acquire
可取消,而另一个不可以。
同步器的实现根据其状态分为两种:独占状态
和共享状态
。
独占状态:同步器在同一时间允许一个线程执行(Lock
)
共享状态:同步器在同一时间允许多个线程执行(Semaphore
)
设计与实现 acquire
和release
简单的伪代码实现:
acquire
操作:
1 2 3 4 5 while (synchronization state does not allow acquire) { enqueue current thread if not already queued; possibly block current thread; } dequeue current thread if it was queued;
release
操作:
1 2 3 update synchronization state;if (state may permit a blocked thread to acquire) unblock one or more queued threads;
实现上面的操作,需要下面三个操作:
同步状态的原子性管理;
线程的阻塞与解除阻塞;
队列的管理;
同步状态 AQS类使用单个int
(32位)来保存同步状态,并暴露出getState
、setState
以及compareAndSet
操作来读取和更新这个状态。
基于AQS的具体实现类必须根据暴露出的状态相关的方法定义tryAcquire
和tryRelease
方法,以控制acquire
和release
操作。当同步状态满足时,tryAcquire
方法必须返回true
,而当新的同步状态允许后续acquire
时,tryRelease
方法也必须返回true
。这些方法都接受一个int
类型的参数用于传递想要的状态。
阻塞 Juc包中提供了一个LockSupport
类,其LockSupport.park
和LockSupport.unpark
用于替换传统的 Thread.suspend
和 Thread.resume
(同一产生死锁),LockSupport.unpark
方法被提前调用也是可以的。
LockSupport.unpark
的调用是没有被计数的,因此在一个park
调用前多次调用unpark
方法只会解除一个park
操作。
另外,它们作用于每个线程而不是每个同步器。一个线程在一个新的同步器上调用park操作可能会立即返回,因为在此之前可能有“剩余的”unpark
操作。
但是,在缺少一个unpark
操作时,下一次调用park
就会阻塞。
队列 整个框架的关键就是如何管理被阻塞的线程的队列,该队列是严格的FIFO队列,因此,框架不支持基于优先级的同步。
同步队列的最佳选择是自身没有使用底层锁来构造的非阻塞数据结构,已知 MCS 和 CLH 两种锁队列,但在 AQS 中使用了 CLH 锁队列,因为CLH 更容易实现超时
和取消
功能。AQS 基于 CLH 进行了修改和 CLH有较大的出入。
第一个对CLH队列主要的修改是添加了 next 字段,来用于唤醒后继节点
第二个对CLH队列主要的修改是将每个节点都有的状态字段用于控制阻塞而非自旋。
而AQS中同步队列的基础实现是其内部类Node
Node 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException (); else return p; } }
waitStatus
的取值:
CANCELLED
: 由于超时或中断导致该节点被取消,被取消节点的线程永远不会再次阻塞
CONDITION
: 该节点当前在条件队列
中, 在传输之前, 它不会用作同步队列
节点, 此时状态将设置为0
.
SIGNAL
: 当前节点的后继节点被阻塞, 如果当前节点释放或取消时必须唤醒其后继节点
PROPAGATE
: 此状态值通常只设置到调用了doReleaseShared()
方法的头节点,确保releaseShared()
方法的调用可以传播到其他的所有节点,简单理解就是共享模式下节点释放的传递标记。
非负值表示节点不需要发信号, 对于常规同步节点,该字段初始化为0, 对于条件节点,该字段初始化为CONDITION 使用CAS对其进行修改
nextWaiter
该字段字面意思是:下一个等待节点,其实有三个取值:
Node.EXCLUSIVE
: 独占模式
Node.SHARED
: 共享模式
其他值: 代表Condition等待队列中当前节点的下一个等待节点
队列变体 知道了AQS 的队列是使用的CLH队列的变体, 所以有必要看看 CLH 和 MCS 两个队列.
CLH 锁队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class _CLH_Lock { final AtomicReference<Node> tail = new AtomicReference <>(new Node ()); final ThreadLocal<Node> current; public _CLH_Lock () { this .current = new ThreadLocal <Node>() { @Override protected Node initialValue () { Node node = new Node (); System.out.println("构造器: " + Thread.currentThread().getName() + "-" + node); return node; } }; } public static void main (String[] args) { _CLH_Lock lock = new _CLH_Lock (); Runnable runnable = () -> { try { lock.lock(); System.out.println(Thread.currentThread().getName() + "获取到了锁" ); TimeUnit.SECONDS.sleep(5 ); System.out.println(Thread.currentThread().getName() + "释放到了锁" ); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } }; new Thread (runnable, "线程 A" ).start(); new Thread (runnable, "线程 B" ).start(); new Thread (runnable, "线程 C" ).start(); } public void lock () throws InterruptedException { Node own = this .current.get(); System.out.println(Thread.currentThread().getName() + "-" + own); own.locked = true ; Node preNode = tail.getAndSet(own); while (preNode.locked) { System.out.println(Thread.currentThread().getName() + "开始自旋...." ); TimeUnit.SECONDS.sleep(2 ); } } public void unlock () { current.get().locked = false ; } class Node { volatile boolean locked = false ; } }
CLH 队列类似于一个伪链表,每个线程在通过 CAS 操作替换 tail
节点引用后,拿到上一个线程节点引用,不断循环检测该线程节点中的 blocked 字段(这个操作就是自旋)。
MCS锁队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public class _MCS_Lock { final AtomicReference<Node> tail = new AtomicReference <>(null ); ThreadLocal<Node> current; public _MCS_Lock () { this .current = new ThreadLocal <Node>() { @Override protected Node initialValue () { return new Node (); } }; } public void lock () throws InterruptedException { Node own = current.get(); Node preNode = tail.getAndSet(own); if (Objects.nonNull(preNode)) { preNode.next = own; own.locked = true ; while (own.locked) { System.out.println(Thread.currentThread().getName() + "开始自旋...." ); TimeUnit.SECONDS.sleep(2 ); } } } public void unlock () { Node own = current.get(); if (Objects.isNull(own.next)) { if (tail.compareAndSet(own, null )) { return ; } while (own.next == null ) { } } own.next.locked = false ; own.next = null ; } class Node { volatile Node next; volatile boolean locked; } public static void main (String[] args) throws InterruptedException { _MCS_Lock lock = new _MCS_Lock (); Runnable runnable = () -> { try { lock.lock(); System.out.println(Thread.currentThread().getName() + "获取到了锁" ); TimeUnit.SECONDS.sleep(5 ); System.out.println(Thread.currentThread().getName() + "释放到了锁" ); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } }; new Thread (runnable, "线程 A" ).start(); new Thread (runnable, "线程 B" ).start(); TimeUnit.SECONDS.sleep(3 ); new Thread (runnable, "线程 C" ).start(); } }
MCS 是一个真正的链表通过 next
字段来关联下一个线程节点,但是相对于 CLH 它的 CAS 操作多了
总结 CLH 适用于SMP 系统架构,不适用于NUMA架构(内存分隔),如果前一个节点的内存过远会导致性能下降。
CLH 对比 MCS:
(1)从代码实现来看,CLH比MCS要简单得多。
(2)从自旋的条件来看,CLH依靠前驱节点自旋,而MCS是依靠自身自旋。
(3)从链表队列来看,CLH的队列是隐式的,MCS的队列是物理存在的,通过 next 字段。
(4)CLH 锁释放时只需要改变自己的属性,MCS锁释放则需要改变后继节点的属性。
(5)CLH 适合CPU个数不多的计算机硬件架构上,MCS则适合拥有很多CPU的硬件架构上
(6)CLH和MCS实现的自旋锁都是不可重入的
独占模式 获取 1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire
方法在独占模式下修改同步状态
, 会至少调用一次tryAcquire
方法, 如果tryAcquire
返回true
代表状态修改成功, 反之则尝试调用acquireQueued
方法入队.
addWaiter
是入队操作, 而acquireQueued
则是从队列中获取操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
队列初始化 在调用addWaiter
进行入队操作时, 可能会出现队列为初始化的情况, 即pred == null
.此时调用了 enq
来对队列进行初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
自旋获取 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
在自旋时, 前置节点的状态:
waitStatus=0: 说明当前节点可以继续自旋, 说不定下次自旋就能获取到锁
waitStatus>0: 说明前置节点已经取消, 更新当前节点的前置节点, 并进行下次自旋
waitStatus<0: 说明当前节点可以阻塞, 这个状态可能是当前节点在执行shouldParkAfterFailedAcquire
修改的.
获取失败后更新状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
当前节点的唤醒是保存在前置节点中的
释放 源代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
后继节点状态 在执行释放时, 后继节点可能处于以下几种状态:
已经阻塞.
还在acquireQueued
方法中执行自旋,还未阻塞.
已经取消.
优化点 在unparkSuccessor
中将当前节点的状态更新成了 0
1 2 if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 );
而后继节点在获取失败后可能在执行 spaf 将前置节点置为 SIGNAL
1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
为什么这是个优化操作呢?
假设队列 : A -> B -> C
假设 B 已经自旋了一次(获取失败), 调用spaf
方法将 A 的状态设置成了 SIGNAL, 进行下一次自旋, 此时 A 会有两种操作:
调用release方法, 此时 A 的状态会变成 0, B 在下一次自旋后发现 A 的状态变成了 0, 则会继续自旋
未调用release方法, 此时 A 的状态还是 SIGNAL(被 B 修改的), 此时 B 在下一次自旋后就可以被阻塞了
这样可以加速锁的获取, 避免了一次没必要的 park.
取消获取 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
总结 共享模式 共享模式下的获取
和释放
和独占模式有一些区别, 共享模式下,锁是可以被多个线程锁持有的.
下面的会涉及到源代码的分析, 再看每行代码时, 都要记住一点:所有的方法都有可能并发执行
获取 1 2 3 4 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
tryAcquireShared
返回值有以下几种情况:
<0
: 表示获取失败
=0
:表示获取成功,但是后继节点不能获取成功
>0
:表示获取成功
doAcquireShared
源代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
流程
调用addWaiter
进行入队操作, 只不过是共享模式
判断前置节点是否是头节点
是头节点: 再次尝试获取, 如果成功, 调用setHeadAndPropagate
方法传播, 反之执行步骤 3
非头节点: 执行步骤 3
获取失败后, 根据前置节点状态 判断是否应该阻塞当前节点
setHeadAndPropagate
我们知道只要当当前节点的前继节点为头节点且再次 tryAcquireShared
成功后再回执行 shp
方法
注意: shp 是setHeadAndPropagate
方法的简称
1 2 3 4 5 6 7 8 9 10 11 12 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
调用 setHead
方法后, 此前的 head
已经不在队列中了
如果 node.next 不为空且处于共享模式, 调用 doReleaseShared()
方法, 此方法在 releaseShared
会一起讲解
释放 源代码 1 2 3 4 5 6 7 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
可以看到在调用了tryReleaseShared
方法后,如果成功则会调用doReleaseShared
, 这和上面共享获取
时setHeadAndPropagate
方法中调用的方法是一致的.
doReleaseShared
此时我们知道在 acquireShared
和 releaseShared
中都会调用此方法, 该方法至少会被一个节点调用两次.
在分析这个方法前, 假设现在有三个节点为别是: A, B, C
且假设入队顺序是 A -> B -> C
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) { continue ; } unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
unparkSuccessor
解锁后继线程, 该方法只有在 CAS 将 head.ws修改成 0 成功时才会执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
总结 至此 AQS 中独占和共享模式下的 acquire 和 release 操作的细节都已分析完毕, 了解 AQS 对学习JUC 包下的类非常有帮助, 如果看完本章还有疑惑, 可以查看一下其它博客.
CLH 锁
同步框架说明文档
AQS讲解
AQS讲解