Java 并发编程之 AQS ReentrantLock await signal 源码解析
Java juc AQS About 4,316 words说明
将ConditionObject中维护的队列称为条件队列。
将AQS中维护的Node队列称为同步队列。
await
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
private Node addConditionWaiter() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
}
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
addConditionWaiter
创建一个节点,加入到条件队列中。
isHeldExclusively:如果没有上锁就直接调用await方法,将直接抛出IllegalMonitorStateException异常。
新创建一个waitStatus等于-2(CONDITION)的节点,如果firstWaiter为空,则将新创建的节点赋值给firstWaiter,反之,将新创建的节点赋值给lastWaiter节点的nextWaiter,并将lastWaiter置为新创建的节点。
fullyRelease
可能有锁重入的情况,所以全部释放锁,将状态值改为0,并且唤醒同步队列中的下一个节点去竞争锁(自身已经是抢得锁正在执行代码的节点,已经不在同步队列中了,竞争锁是可能是非公平锁)。
isOnSyncQueue
如果已经被唤醒,进入了同步队列了则返回true,正常await生成的Node的waitStatus都等于CONDITION,所以返回false,取反,进入while循环,暂停了当前线程。
signal
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
(firstWaiter = first.nextWaiter) == null
将firstWaiter的下一个等待节点赋值给firstWaiter并且判空,如果为空,条件队列已经为空了,将lastWaiter也置为空,
firstWaiter = first.nextWaiter的作用是将原先的firstWaiter移除条件队列。
first.nextWaiter = null
将原先的等待节点的nextWaiter置为空,帮助GC。
transferForSignal
将这个从条件队列移除的节点,转移到同步队列中。
!node.compareAndSetWaitStatus(Node.CONDITION, 0):如果发生了中断等情况,CAS失败,继续while条件的后续判断,判断的是条件队列的下一个节点是否能唤醒。
如果CAS成功,则enq(node)将队列放入到同步队列中,返回的对象是入队前的队尾节点。即当前node变量的prev前驱节点。
如果前驱节点已经取消了或者CAS置为SIGNAL失败后,直接恢复线程的运行。
然后返回true,取反,结束while循环,唤醒流程结束。
注意
ReentrantReadWriteLock中的读锁不能使用newCondition()方法,直接抛出了异常。(写锁可使用)
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb/LiteNote扫描下方二维码关注公众号和小程序↓↓↓