在 Java 并发编程中,Condition 接口是一个非常重要的组件,它与 ReentrantLock 紧密结合,提供了比传统的 Object.wait/notify 机制更为灵活和强大的线程间协调功能。本文将深入探讨 Condition 接口的实现原理、使用方法以及与 ReentrantLock 的关系。
1 Condition 接口概述
Condition 接口提供了类似于 Object.wait/notify 的等待/通知机制,但它更加灵活和强大。Condition 接口定义了以下几个主要方法:
await():使当前线程等待,直到被通知或中断。类似于 Object.wait()。awaitUninterruptibly():使当前线程等待,直到被通知,即使在等待时被中断也不会返回。await(long time, TimeUnit unit):使当前线程等待指定的时间,或被通知,或被中断。类似于 Object.wait(long timeout)。awaitNanos(long nanosTimeout):使当前线程等待指定的纳秒时间,或被通知,或被中断。awaitUntil(Date deadline):使当前线程等待直到指定的截止日期,或被通知,或被中断。signal():唤醒一个等待的线程。类似于 Object.notify()。signalAll():唤醒所有等待的线程。类似于 Object.notifyAll()。
2 Condition 的实现原理
Condition 接口的实现类是 ConditionObject,它是 AbstractQueuedSynchronizer(AQS)的一个内部类。通过 lock.newCondition() 方法可以创建一个 Condition 对象。
以ReentrantLock为例说明:
public class ReentrantLock implements Lock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
public Condition newCondition() {
return sync.newCondition();
}
}
ConditionObject 内部维护了一个先进先出(FIFO)的单向等待队列,称为等待队列。所有调用 await 方法的线程都会加入到这个等待队列中,并且线程状态均为等待状态。等待队列的头节点由 firstWaiter 指向,尾节点由 lastWaiter 指向。
源码如下:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** condition队列的第一个节点. */
private transient Node firstWaiter;
/** condition队列的最后一个节点. */
private transient Node lastWaiter;
}
Node 中的 nextWaiter 指向队列中的下一个节点。并且进入到等待队列的 Node 节点状态都会被设置为 CONDITION。
3 Condition 的 await 方法
3.1 await 方法的实现原理
Condition 接口的实现类是 ConditionObject,它是 AbstractQueuedSynchronizer(AQS)的一个内部类。await 方法的主要作用是将当前获取锁的线程加入到等待队列中,并在特定条件下等待。
ConditionObject 的 await 方法源码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成 Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的 lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到 lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
代码的主要逻辑如下:
将当前线程包装成 Node,并加入到等待队列中:通过 addConditionWaiter() 方法实现。释放当前线程所占用的锁:通过 fullyRelease(node) 方法实现,释放锁的过程中会唤醒同步队列中的下一个节点。当前线程进入等待状态:通过 LockSupport.park(this) 方法使线程进入等待状态。自旋等待获取同步状态:当线程被唤醒后,会自旋等待获取同步状态(即获取锁)。处理中断情况:如果线程在等待过程中被中断,会根据中断模式进行处理。
3.2 将当前线程添加到等待队列中
调用 addConditionWaiter 方法会将当前线程添加到等待队列中,源码如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
// 将不处于等待状态的节点从等待队列中移除
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 尾节点为空
if (t == null)
// 将首节点指向 node
firstWaiter = node;
else
// 将尾节点的 nextWaiter 指向 node 节点
t.nextWaiter = node;
// 尾节点指向 node
lastWaiter = node;
return node;
}
这段代码的作用是通过尾插入的方式将当前线程封装的 Node 插入到等待队列中。等待队列是一个不带头节点的链式队列,与 AQS 的同步队列(带头节点)有所不同。
3.3 释放锁的过程
将当前节点插入到等待队列之后,会使当前线程释放锁,由 fullyRelease 方法实现,源码如下:
final int fullyRelease(Node node) {
// 释放锁失败为 true,释放锁成功为 false
boolean failed = true;
try {
// 获取当前锁的 state
int savedState = getState();
// 释放锁成功的话
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 释放锁失败的话将节点状态置为取消
node.waitStatus = Node.CANCELLED;
}
}
这段代码调用 AQS 的模板方法 release 释放 AQS 的同步状态,并且唤醒在同步队列中头节点的后继节点引用的线程。如果释放成功则正常返回,若失败则抛出异常。
3.4 从 await 方法中退出
怎样从 await 方法中退出呢?await 方法中有一段逻辑如下:
while (!isOnSyncQueue(node)) {
// 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
isOnSyncQueue 方法用于判断当前线程所在的 Node 是否在同步队列中。如果当前节点不在同步队列中,则线程会进入等待状态。退出 await 方法的前提条件是当前线程被中断或者调用 condition.signal 或者 condition.signalAll 使当前节点移动到同步队列后。
当退出 while 循环后,会调用 acquireQueued(node, savedState),该方法的作用是在自旋过程中线程不断尝试获取同步状态,直到成功(线程获取到锁)。这样也说明了退出 await 方法必须是已经获得了 condition 引用(关联)的锁。
3.5 超时机制的支持
Condition 还额外支持超时机制,使用者可调用 awaitNanos、awaitUntil 这两个方法,实现原理基本上与 AQS 中的 tryAcquire 方法如出一辙。
3.6 不响应中断的支持
要想不响应中断可以调用 condition.awaitUninterruptibly() 方法,该方法的源码如下:
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
这段方法与上面的 await 方法基本一致,只不过减少了对中断的处理。
4 Condition 的 signal/signalAll 方法
调用 condition 的 signal 或者 signalAll 方法可以将等待队列中等待时间最长的节点移动到同步队列中,使得该节点能够有机会获得 lock。
4.1 signal 方法的实现原理
signal 方法的主要作用是将等待队列中的头节点移动到同步队列中,使得该节点能够有机会获得锁。signal 方法的源码如下:
public final void signal() {
// 1. 先检测当前线程是否已经获取 lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
signal 方法首先会检测当前线程是否已经获取了锁,如果没有获取锁会直接抛出 IllegalMonitorStateException 异常。如果获取了锁,再得到等待队列的头节点,之后的 doSignal 方法也是基于该节点。
4.2 doSignal 方法的实现
doSignal 方法的源码如下:
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 1. 将头节点从等待队列中移除
first.nextWaiter = null;
// 2. while 中 transferForSignal 方法对头节点做真正的处理
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
doSignal 方法的主要逻辑如下:
将头节点从等待队列中移除:将 firstWaiter 指向头节点的下一个节点,如果头节点的下一个节点为空,则将 lastWaiter 置为 null。调用 transferForSignal 方法对头节点进行处理:transferForSignal 方法会将头节点移动到同步队列中。
4.3 transferForSignal 方法的实现
transferForSignal 方法的源码如下:
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 1. 更新状态为 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 2. 将该节点移入到同步队列中去
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
transferForSignal 方法的主要逻辑如下:
更新节点状态为 0:通过 compareAndSetWaitStatus 方法将节点的状态从 CONDITION 更新为 0。将节点移入同步队列:调用 enq 方法将节点插入到同步队列中。enq 方法会将节点插入到同步队列的尾部。唤醒节点:如果节点的前驱节点的状态为 CANCELLED 或者设置前驱节点的状态为 SIGNAL 失败,则唤醒该节点。
4.4 signal 方法的执行示意图
signal 方法的执行示意图如下:
调用 condition.signal 方法的前提条件是当前线程已经获取了锁,该方法会使等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会被唤醒,即从 await 方法中的 LockSupport.park(this) 方法中返回,才有机会让调用 await 方法的线程成功退出。
4.5 signalAll 方法的实现原理
signalAll 方法与 signal 方法的区别体现在 doSignalAll 方法上。doSignalAll 方法会将等待队列中的每一个节点都移入到同步队列中,即“通知”当前调用 condition.await() 方法的每一个线程。
doSignalAll 方法的源码如下:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
doSignalAll 方法的主要逻辑如下:
清空等待队列的头尾节点:将 firstWaiter 和 lastWaiter 都置为 null。遍历等待队列中的每一个节点:将每个节点从等待队列中移除,并调用 transferForSignal 方法将节点移入同步队列中。
5 await 与 signal/signalAll 的关系
5.1 等待/通知机制
await 和 signal/signalAll 方法之间的关系可以用一个开关来比喻,控制着线程 A(等待方)和线程 B(通知方)。具体来说:
await:线程 A 调用 await 方法后,会释放当前持有的锁,并进入等待队列,直到被其他线程唤醒。signal:线程 B 调用 signal 方法后,会将等待队列中的头节点(即等待时间最长的节点)移动到同步队列中,使得该节点有机会获取锁。signalAll:线程 B 调用 signalAll 方法后,会将等待队列中的所有节点都移动到同步队列中,使得所有等待的线程都有机会获取锁。
5.2 生产者与消费者问题
生产者与消费者问题是并发编程中的经典问题,通常涉及两个角色:生产者和消费者。生产者负责生产数据并将其放入缓冲区,消费者负责从缓冲区中取出数据并进行处理。为了保证生产者和消费者之间的协调,可以使用 Condition 接口的 await 和 signal/signalAll 方法。
下面是一个简单的生产者与消费者问题的示例:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumer {
private final int capacity = 5;
private final Queue
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (buffer.size() == capacity) {
notFull.await(); // 缓冲区满时等待
}
buffer.add(item);
System.out.println("Produced: " + item);
notEmpty.signal(); // 通知消费者缓冲区非空
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (buffer.isEmpty()) {
notEmpty.await(); // 缓冲区空时等待
}
int item = buffer.poll();
System.out.println("Consumed: " + item);
notFull.signal(); // 通知生产者缓冲区非满
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
Thread producerThread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
pc.produce(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumerThread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
pc.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread.start();
consumerThread.start();
}
}
在这个示例中,生产者线程调用 produce 方法将数据放入缓冲区,如果缓冲区已满,则调用 notFull.await() 方法进入等待状态。消费者线程调用 consume 方法从缓冲区中取出数据,如果缓冲区为空,则调用 notEmpty.await() 方法进入等待状态。当生产者放入数据后,会调用 notEmpty.signal() 方法通知消费者缓冲区非空;当消费者取出数据后,会调用 notFull.signal() 方法通知生产者缓冲区非满。
5.3 await、signal 和 signalAll 的关系图
await、signal 和 signalAll 方法之间的关系可以用下图来说明: 线程 awaitThread 先通过 lock.lock() 方法获取锁,成功后调用 condition.await 方法进入等待队列,而另一个线程 signalThread 通过 lock.lock() 方法获取锁成功后调用了 condition.signal 或者 signalAll 方法,使得线程 awaitThread 能够有机会移入到同步队列中,当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取 lock,从而使得线程 awaitThread 能够从 await 方法中退出并执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。
6 Condition 的使用示例
下面是一个简单的示例,展示了如何使用 Condition 实现线程间的协调:
public class AwaitSignal {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
private static volatile boolean flag = false;
public static void main(String[] args) {
Thread waiter = new Thread(new Waiter());
waiter.start();
Thread signaler = new Thread(new Signaler());
signaler.start();
}
static class Waiter implements Runnable {
@Override
public void run() {
lock.lock();
try {
while (!flag) {
System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "接收到通知条件满足");
} finally {
lock.unlock();
}
}
}
static class Signaler implements Runnable {
@Override
public void run() {
lock.lock();
try {
flag = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
}
}
在这个示例中,Waiter 线程会在条件不满足时进入等待状态,Signaler 线程会在条件满足时通知所有等待的线程。
7 小结
Condition 接口为 Java 并发编程提供了强大的线程间协调机制。通过与 ReentrantLock 结合使用,Condition 可以实现更细粒度的线程控制,解决复杂的同步问题,如生产者-消费者问题。理解 Condition 的实现原理和使用方法,对于编写高效、可靠的并发程序至关重要。
8 思维导图
9 参考链接
详解 Java Condition 的 await 和 signal 等待通知机制