Java并发工具之AQS:共享模式实现细节

AQS(AbstractQueuedSynchronizer)是Java并发编程中最重要的工具之一,基于其实现了ReentrantLock、CountDownLatch、Semaphore和ReentrantReadWriteLock等,可以说是锁的基石。

在深入AQS细节之前,先介绍另外一个基础工具,AQS使用其来控制线程。

LockSupport

顾名思义,是实现锁的辅助工具,其本身与锁无关,只是用来挂起和唤醒线程,主要实现了两个方法。

  • park
    挂起当前线程。

  • unpark
    唤醒指定线程。

采用许可(permit)模型,park消费许可,unpark颁发许可,所以两个方法调用顺序没有影响,unpark也可以重复调用,许可只会有一个。

AQS

AQS本身也与锁无关,只是定义了一个资源和针对该资源的一组操作,比如获取资源、释放资源、资源获取失败线程进入等待队列并挂起,资源释放时唤醒等待线程使重新竞争资源等。

等待队列采用先入先出(FIFO)队列,一旦进入队列,线程获取资源顺序保持严格有序,即从头结点后继节点依次向后。由于共享模式下多个线程可同时获取资源,为避免惊群效应,节点只关注其前驱节点,前驱节点获得资源成为头结点后,当前节点成为头结点后继节点,才有机会被唤醒并尝试获取资源。

有两种模式,可以单独或组合使用。

独占模式(Exclusive)

资源同时只能被单个线程占有,获取和释放资源只能是一个线程,对应方法 acquirerelease,模板方法 tryAcquiretryRelease

独占模式相对比较简单,所以不费太多笔墨。

共享模式(Share)

资源可以同时被多个线程占有,可以有多个线程获取和释放资源,对应方法 acquireSharedreleaseShared,模板方法 tryAcquireSharedtryReleaseShared

先看一下方法源码,基于 OpenJDK 9

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

模板方法 tryAcquireSharedtryReleaseShared 需要派生类实现,用来表达自己逻辑意义上的获取资源和释放资源行为。

再看一下三个关键方法源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate.
*/
static final int PROPAGATE = -3;

shared

上面提到过,共享模式下,可以有多个线程占有资源,采用从头结点依次唤醒后继节点的方式,所以 acquireSharedreleaseShared 都会调用到 doReleaseShared,该方法即用来执行唤醒后继节点的操作。

根据源码,两个模板方法有以下含义。

  • tryAcquireShared
    返回值类型为int,<0表示获取资源失败,线程需要进入等待队列并挂起,>=0表示获取资源成功,如果线程在等待队列,所在节点变为头结点,其中>0表明还有剩余资源,需要唤醒后继节点线程去尝试获取资源。此种情况下只能唤醒后继共享节点,因为前面是共享节点线程获取到资源,只有同类的共享模式线程有资格”分享”剩余资源。

  • tryReleaseShared
    返回值类型为boolean,false为释放资源失败,true为释放资源成功,需要尝试唤醒后继节点线程去获取资源。此种情况下可以唤醒后继独占或共享节点,因为是资源被释放,独占或共享模式线程都有资格竞争资源。

这种从头结点后继节点依次唤醒的设计,称作 propagate,意为传播。纵观整个设计逻辑,可以得到一个规则:一旦释放资源成功,在等待队列中的节点应该有机会被传播到,从而节点线程被唤醒重新尝试获取资源,即传播链应该始终存在,不能中断,因为一旦中断,等待队列中的节点就可能会永远hang住无法被唤醒,整个AQS就失效了。这个规则是后续讨论的基础。

整体实现逻辑可以自行阅读理解,不再赘述,下面探讨部分重要的实现细节。

图上红字提出了4个问题:

  1. doReleaseShared 为什么使用for循环?

  2. doReleaseShared 中的 PROPAGATE 状态有什么意义?

  3. doReleaseShared 为什么在 h == head 时才break,即为什么head改变要重试?

  4. setHeadAndPropagate 为什么要同时判断新老两个head结点的waitStatus?

我们构造若干并发场景,尝试解答。

场景一

假定此时,无问题1-4中的所有元素。

此时等待队列如图所示:

scene1

  1. TA releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseShared,此时 h = head, h.waitStatus = SIGNAL,CAS成功,h.waitStatus = 0,调用unparkSuccessor 唤醒后继节点。

  2. T1 被唤醒,tryAcquireShared 获取资源成功,r = 0,执行 setHeadAndPropagatesetHead 前,此时 propagate = r = 0

  3. TB releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseShared,由于此时 h = head, h.waitStatus = 0,CAS失败,无法唤醒后继节点。

  4. T1 继续执行,由于 propagate = 0, h.waitStatus = 0,无法调用 doReleaseShared唤醒后继节点。

此场景下,TB 的资源释放丢失了,传播链中断,违背了上面提到的规则,AQS失效。

场景二

环境同场景一,但加入问题2的 PROPAGATE 状态。

  1. TA releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseShared,此时 h = head, h.waitStatus = SIGNAL,CAS成功,h.waitStatus = 0,调用unparkSuccessor 唤醒后继节点。

  2. T1 被唤醒,tryAcquireShared 获取资源成功,r = 0,执行 setHeadAndPropagatesetHead 前,此时 propagate = r = 0

  3. TB releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseShared,由于此时 h = head, h.waitStatus = 0,走 else if 逻辑,CAS成功,h.waitStatus = PROPAGATE

  4. T1 继续执行,由于 propagate = 0, h.waitStatus = PROPAGATE < 0,调用 doReleaseShared唤醒后继节点。

加入 PROPAGATE 状态后,传播链得以延续。

但仅仅加入 PROPAGATE 状态是否就可以了呢?

场景三

环境同场景二。

  1. TA releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseShared,此时 h = head, h.waitStatus = SIGNAL,CAS成功,h.waitStatus = 0,调用unparkSuccessor 唤醒后继节点。

  2. T1 被唤醒,tryAcquireShared 获取资源成功,r = 0,执行 setHeadAndPropagatesetHead 前,此时 propagate = r = 0

  3. TB releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseSharedNode h = head 后。

  4. T1 继续执行,由于 propagate = 0, h.waitStatus = 0,无法调用 doReleaseShared唤醒后继节点。

  5. TB 继续执行,此时 h.waitStatus = 0,走 else if 逻辑,CAS成功,h.waitStatus = PROPAGATE

此场景下,操作5已经没有意义,传播链已经中断。

可见,仅仅加入 PROPAGATE 是不够的。场景三的问题出在,进行操作5时,head已经变成新的(即原head的后继节点),再把旧的 head.waitStatus 设置为 PROPAGATE 状态起不到任何作用。

场景四

环境同场景三,但加入问题3的 h == head,这样必然可能要重试,所以也要加入问题1的 for

  1. TA releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseShared,此时 h = head, h.waitStatus = SIGNAL,CAS成功,h.waitStatus = 0,调用unparkSuccessor 唤醒后继节点。

  2. T1 被唤醒,tryAcquireShared 获取资源成功,r = 0,执行 setHeadAndPropagatesetHead 前,此时 propagate = r = 0

  3. TB releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseSharedNode h = head 后。

  4. T1 继续执行,由于 propagate = 0, h.waitStatus = 0,无法调用 doReleaseShared唤醒后继节点。

  5. TB 继续执行,此时 h.waitStatus = 0,走 else if 逻辑,CAS成功,h.waitStatus = PROPAGATE;继续执行,由于head已经在操作4中改变, h == head 返回false,无法退出for循环,将进行重试,h = head, h.waitStatus = SIGNAL,CAS成功,h.waitStatus = 0,调用unparkSuccessor 唤醒后继节点。

加入 forh == head 后,传播链得以延续。

这样就毫无破绽万事大吉了吗?

场景五

假定此时,有问题1-3中的元素,无问题4中的元素,即不会获取新head并检查其waiStatus,对应代码中的 (h = head) == null || h.waitStatus < 0(旧head的waitStatus检查的必要性已经在上述场景中证明过了)。

此时等待队列如图所示:

scene2

  1. TA releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseShared,此时 h = head, h.waitStatus = SIGNAL,CAS成功,h.waitStatus = 0,调用unparkSuccessor 唤醒后继节点。

  2. T1 被唤醒,tryAcquireShared 获取资源成功,r = 0,执行 setHeadAndPropagatesetHead 后,head改变为T1节点记为h1,h仍为旧head,此时 h.waitStatus = 0, h1.waitStatus = 0(因为T1 没有后继节点,其waitStatus默认为0)。

  3. T2 acquireShared,调用 tryAcquireShared 获取资源,由于此时没有多余资源,获取资源失败,进入等待队列,执行至 shouldParkAfterFailedAcquire 前,此时 T2 已成为 T1
    也即head的后继节点,但因为尚未park,仍然 h1 = head, h1.waitStatus = 0

  4. TB releaseShared 调用 tryReleaseShared 释放资源成功,执行 doReleaseSharedh = head, h.waitStatus = 0,走 else if 逻辑,CAS成功,h.waitStatus = PROPAGATE

  5. T1 继续执行,由于 propagate = 0, h.waitStatus = 0,无法调用 doReleaseShared唤醒后继节点。

  6. T2 继续执行,park至head后继节点,head.waitStatus = SIGNAL

此场景下,操作6已经没有意义,传播链已经中断。

可见,这样是不够的,因为操作4修改的是新head,但操作5只检查了旧head,导致没有起到效果。

加入问题4的元素 (h = head) == null || h.waitStatus < 0 后,显而易见,操作5中会检查新head的waitStatus,因为其在操作4中已经被置为 PROPAGATE,所以检查结果为true,进而调用到doReleaseShared 唤醒后继节点,无论T2 是否park,都可以被唤醒,传播链得以延续。

结语

以上通过构建若干并发场景,验证了AQS部分实现细节的必要性,也见识了设计的精巧细致。理解了AQS共享模式,再去看独占模式就相对容易了。及至基于此实现的各种锁,也就不难理解和掌握了。