Java Concurrent - AQS & CountDownLatch

Overview

CountDownLatch本身实现非常简单, 观察内部几乎所有方法都是直接调用AQS. 先看下它的典型用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CountDownLatch latch = new CountDownLatch(2);

Runnable wait = () -> {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
};

Thread thread1 = new Thread(wait);
thread1.start();

Thread thread2 = new Thread(wait);
thread2.start();

Thread.sleep(100); // ensure thread1, thread2 start first

Runnable countDown = () -> {
latch.countDown();
};

Thread thread3 = new Thread(countDown);
thread3.start();

Thread thread4 = new Thread(countDown);
thread4.start();

对比CountDownLatchWorker, 前者的await有些类似后者的lock, 而countDown类似unlock. 然而Worker是一种排他的同步工具, 而CountDownLatch则不是. 在上面的栗子中, thread1, thread2都会处于等待状态, 直到thread3, thread4执行完成.

根据上面的代码可以确定三个入口: 构造方法, await, countDown. 观察CountDownLatch的实现, 其中包含一个继承AQS的类Sync. 构造方法中, 只是设置state值; 传入的参数count即为AQS的state. countDown方法中将state减一, 为0时await的线程即将结束等待.

await

如图:

如果对比Worker#lock的流程图, 可以发现两者的结构很类似. 实际上, 很多方法都是可以一一对应, 在后面会列一个表格出来. 图中tryAcquireShared由子类实现, 代码如下:

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
  • 入口处如果getState != 0(实际上就是大于0), 那么就直接进入等待队列. 否则getState == 0, 实际上就不需要等待了, 直接return
  • 循环中, getState == 0, 即可以退出循环.

综上, tryAcquireShared的结果大于0, 即表示acquire成功.

doReleaseShared方法, 在下文countDown处说明.

await(long timeout, TimeUnit unit)

这个重载方法和await区别不大. 它内部调用的是AQS#doAcquireSharedNanos. 观察其中代码可以发现几乎没有任何区别, 只不过多了时间控制. 其中值得注意的是, 如果时间小于spinForTimeooutThreadhold, 则通过自旋(即循环), 否则通过LockSupport.partNanos来实现, 这样实现是处于性能考虑, 注释中亦有说明.

countDown

CountDownLatch#countDown即调用Sync#releaseShared(1). 其中代码很短:

1
2
3
4
5
if (tryReleaseShared(1)) {
doReleaseShared();
return true;
}
return false;

tryReleaseShared的内容可以概括为: 尝试将state减一, 如果成功则返回true.

如果tryReleaseShared成功, 则调用doReleaseShared. 此方法在上文也有提到, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
  • 仅当h == head时才能跳出循环. 也就是说如果执行期间head被修改了, 需要重新执行操作.
  • 如果head.waitStatusSIGNAL, 则将其设置CAS为0, 并unpark其后置节点.
  • 如果head.waitStatus为0, 则将其设置为PROPAGATE, 进入下次循环. 这里比较值得说明一下, 在我们刚刚设想的理想状态下, head.waitStatus是应该为SIGNAL的. 为0说明此时还没有其他节点进来, TODO

整体过程Review

通过几个重要的变量, 来说明下整个过程的数据变化:

  • Thread3未开始执行时, state == 2, Thread3第一次countDown, state只是变成1, 所以tryReleaseShared返回false.
  • Thread4再次countDown时, 才返回true. 此时, 调用doReleaseShared. Thread1unpark, 跳出循环. 回到
    doAcquireSharedInterruptibly, 随后执行setHeadAndPropagate,其中再次执行了doReleaseShared, 从而Thread2也被唤醒.

与Worker对比


See Also