Java并发工具之AQS:共享模式实现细节
AQS(AbstractQueuedSynchronizer)是Java并发编程中最重要的工具之一,基于其实现了ReentrantLock、CountDownLatch、Semaphore和ReentrantReadWriteLock等,可以说是锁的基石。
在深入AQS细节之前,先介绍另外一个基础工具,AQS使用其来控制线程。
LockSupport
顾名思义,是实现锁的辅助工具,其本身与锁无关,只是用来挂起和唤醒线程,主要实现了两个方法。
park
挂起当前线程。unpark
唤醒指定线程。
采用许可(permit)模型,park消费许可,unpark颁发许可,所以两个方法调用顺序没有影响,unpark也可以重复调用,许可只会有一个。
AQS
AQS本身也与锁无关,只是定义了一个资源和针对该资源的一组操作,比如获取资源、释放资源、资源获取失败线程进入等待队列并挂起,资源释放时唤醒等待线程使重新竞争资源等。
等待队列采用先入先出(FIFO)队列,一旦进入队列,线程获取资源顺序保持严格有序,即从头结点后继节点依次向后。由于共享模式下多个线程可同时获取资源,为避免惊群效应,节点只关注其前驱节点,前驱节点获得资源成为头结点后,当前节点成为头结点后继节点,才有机会被唤醒并尝试获取资源。
有两种模式,可以单独或组合使用。
独占模式(Exclusive)
资源同时只能被单个线程占有,获取和释放资源只能是一个线程,对应方法 acquire
和 release
,模板方法 tryAcquire
和 tryRelease
。
独占模式相对比较简单,所以不费太多笔墨。
共享模式(Share)
资源可以同时被多个线程占有,可以有多个线程获取和释放资源,对应方法 acquireShared
和 releaseShared
,模板方法 tryAcquireShared
和 tryReleaseShared
。
先看一下方法源码,基于 OpenJDK 9。
1 | public final void acquireShared(int arg) { |
1 | public final boolean releaseShared(int arg) { |
模板方法 tryAcquireShared
和 tryReleaseShared
需要派生类实现,用来表达自己逻辑意义上的获取资源和释放资源行为。
再看一下三个关键方法源码。
1 | static final class Node { |
上面提到过,共享模式下,可以有多个线程占有资源,采用从头结点依次唤醒后继节点的方式,所以 acquireShared
和 releaseShared
都会调用到 doReleaseShared
,该方法即用来执行唤醒后继节点的操作。
根据源码,两个模板方法有以下含义。
tryAcquireShared
返回值类型为int,<0表示获取资源失败,线程需要进入等待队列并挂起,>=0表示获取资源成功,如果线程在等待队列,所在节点变为头结点,其中>0表明还有剩余资源,需要唤醒后继节点线程去尝试获取资源。此种情况下只能唤醒后继共享节点,因为前面是共享节点线程获取到资源,只有同类的共享模式线程有资格”分享”剩余资源。tryReleaseShared
返回值类型为boolean,false为释放资源失败,true为释放资源成功,需要尝试唤醒后继节点线程去获取资源。此种情况下可以唤醒后继独占或共享节点,因为是资源被释放,独占或共享模式线程都有资格竞争资源。
这种从头结点后继节点依次唤醒的设计,称作 propagate,意为传播。纵观整个设计逻辑,可以得到一个规则:一旦释放资源成功,在等待队列中的节点应该有机会被传播到,从而节点线程被唤醒重新尝试获取资源,即传播链应该始终存在,不能中断,因为一旦中断,等待队列中的节点就可能会永远hang住无法被唤醒,整个AQS就失效了。这个规则是后续讨论的基础。
整体实现逻辑可以自行阅读理解,不再赘述,下面探讨部分重要的实现细节。
图上红字提出了4个问题:
doReleaseShared
为什么使用for循环?doReleaseShared
中的 PROPAGATE 状态有什么意义?doReleaseShared
为什么在h == head
时才break,即为什么head改变要重试?setHeadAndPropagate
为什么要同时判断新老两个head结点的waitStatus?
我们构造若干并发场景,尝试解答。
场景一
假定此时,无问题1-4中的所有元素。
此时等待队列如图所示:
TA
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
,此时h = head, h.waitStatus = SIGNAL
,CAS成功,h.waitStatus = 0
,调用unparkSuccessor
唤醒后继节点。T1 被唤醒,
tryAcquireShared
获取资源成功,r = 0
,执行setHeadAndPropagate
至setHead
前,此时propagate = r = 0
。TB
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
,由于此时h = head, h.waitStatus = 0
,CAS失败,无法唤醒后继节点。T1 继续执行,由于
propagate = 0, h.waitStatus = 0
,无法调用doReleaseShared
唤醒后继节点。
此场景下,TB 的资源释放丢失了,传播链中断,违背了上面提到的规则,AQS失效。
场景二
环境同场景一,但加入问题2的 PROPAGATE 状态。
TA
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
,此时h = head, h.waitStatus = SIGNAL
,CAS成功,h.waitStatus = 0
,调用unparkSuccessor
唤醒后继节点。T1 被唤醒,
tryAcquireShared
获取资源成功,r = 0
,执行setHeadAndPropagate
至setHead
前,此时propagate = r = 0
。TB
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
,由于此时h = head, h.waitStatus = 0
,走else if
逻辑,CAS成功,h.waitStatus = PROPAGATE
。T1 继续执行,由于
propagate = 0, h.waitStatus = PROPAGATE < 0
,调用doReleaseShared
唤醒后继节点。
加入 PROPAGATE 状态后,传播链得以延续。
但仅仅加入 PROPAGATE 状态是否就可以了呢?
场景三
环境同场景二。
TA
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
,此时h = head, h.waitStatus = SIGNAL
,CAS成功,h.waitStatus = 0
,调用unparkSuccessor
唤醒后继节点。T1 被唤醒,
tryAcquireShared
获取资源成功,r = 0
,执行setHeadAndPropagate
至setHead
前,此时propagate = r = 0
。TB
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
至Node h = head
后。T1 继续执行,由于
propagate = 0, h.waitStatus = 0
,无法调用doReleaseShared
唤醒后继节点。TB 继续执行,此时
h.waitStatus = 0
,走else if
逻辑,CAS成功,h.waitStatus = PROPAGATE
。
此场景下,操作5已经没有意义,传播链已经中断。
可见,仅仅加入 PROPAGATE 是不够的。场景三的问题出在,进行操作5时,head已经变成新的(即原head的后继节点),再把旧的 head.waitStatus
设置为 PROPAGATE 状态起不到任何作用。
场景四
环境同场景三,但加入问题3的 h == head
,这样必然可能要重试,所以也要加入问题1的 for
。
TA
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
,此时h = head, h.waitStatus = SIGNAL
,CAS成功,h.waitStatus = 0
,调用unparkSuccessor
唤醒后继节点。T1 被唤醒,
tryAcquireShared
获取资源成功,r = 0
,执行setHeadAndPropagate
至setHead
前,此时propagate = r = 0
。TB
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
至Node h = head
后。T1 继续执行,由于
propagate = 0, h.waitStatus = 0
,无法调用doReleaseShared
唤醒后继节点。TB 继续执行,此时
h.waitStatus = 0
,走else if
逻辑,CAS成功,h.waitStatus = PROPAGATE
;继续执行,由于head已经在操作4中改变,h == head
返回false,无法退出for循环,将进行重试,h = head, h.waitStatus = SIGNAL
,CAS成功,h.waitStatus = 0
,调用unparkSuccessor
唤醒后继节点。
加入 for
和 h == head
后,传播链得以延续。
这样就毫无破绽万事大吉了吗?
场景五
假定此时,有问题1-3中的元素,无问题4中的元素,即不会获取新head并检查其waiStatus,对应代码中的 (h = head) == null || h.waitStatus < 0
(旧head的waitStatus检查的必要性已经在上述场景中证明过了)。
此时等待队列如图所示:
TA
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
,此时h = head, h.waitStatus = SIGNAL
,CAS成功,h.waitStatus = 0
,调用unparkSuccessor
唤醒后继节点。T1 被唤醒,
tryAcquireShared
获取资源成功,r = 0
,执行setHeadAndPropagate
至setHead
后,head改变为T1节点记为h1,h仍为旧head,此时h.waitStatus = 0, h1.waitStatus = 0
(因为T1 没有后继节点,其waitStatus默认为0)。T2
acquireShared
,调用tryAcquireShared
获取资源,由于此时没有多余资源,获取资源失败,进入等待队列,执行至shouldParkAfterFailedAcquire
前,此时 T2 已成为 T1
也即head的后继节点,但因为尚未park,仍然h1 = head, h1.waitStatus = 0
。TB
releaseShared
调用tryReleaseShared
释放资源成功,执行doReleaseShared
,h = head, h.waitStatus = 0
,走else if
逻辑,CAS成功,h.waitStatus = PROPAGATE
。T1 继续执行,由于
propagate = 0, h.waitStatus = 0
,无法调用doReleaseShared
唤醒后继节点。T2 继续执行,park至head后继节点,
head.waitStatus = SIGNAL
。
此场景下,操作6已经没有意义,传播链已经中断。
可见,这样是不够的,因为操作4修改的是新head,但操作5只检查了旧head,导致没有起到效果。
加入问题4的元素 (h = head) == null || h.waitStatus < 0
后,显而易见,操作5中会检查新head的waitStatus,因为其在操作4中已经被置为 PROPAGATE,所以检查结果为true,进而调用到doReleaseShared
唤醒后继节点,无论T2 是否park,都可以被唤醒,传播链得以延续。
结语
以上通过构建若干并发场景,验证了AQS部分实现细节的必要性,也见识了设计的精巧细致。理解了AQS共享模式,再去看独占模式就相对容易了。及至基于此实现的各种锁,也就不难理解和掌握了。