Java 并发编程之 AQS ReentrantLock await signal 源码解析
Java juc AQS About 4,316 words说明
将ConditionObject
中维护的队列称为条件队列。
将AQS
中维护的Node
队列称为同步队列。
await
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
private Node addConditionWaiter() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
}
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
addConditionWaiter
创建一个节点,加入到条件队列中。
isHeldExclusively
:如果没有上锁就直接调用await
方法,将直接抛出IllegalMonitorStateException
异常。
新创建一个waitStatus
等于-2
(CONDITION
)的节点,如果firstWaiter
为空,则将新创建的节点赋值给firstWaiter
,反之,将新创建的节点赋值给lastWaiter
节点的nextWaiter
,并将lastWaiter
置为新创建的节点。
fullyRelease
可能有锁重入的情况,所以全部释放锁,将状态值改为0
,并且唤醒同步队列中的下一个节点去竞争锁(自身已经是抢得锁正在执行代码的节点,已经不在同步队列中了,竞争锁是可能是非公平锁)。
isOnSyncQueue
如果已经被唤醒,进入了同步队列了则返回true
,正常await
生成的Node
的waitStatus
都等于CONDITION
,所以返回false
,取反,进入while
循环,暂停了当前线程。
signal
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
(firstWaiter = first.nextWaiter) == null
将firstWaiter
的下一个等待节点赋值给firstWaiter
并且判空,如果为空,条件队列已经为空了,将lastWaiter
也置为空,
firstWaiter = first.nextWaiter
的作用是将原先的firstWaiter
移除条件队列。
first.nextWaiter = null
将原先的等待节点的nextWaiter
置为空,帮助GC
。
transferForSignal
将这个从条件队列移除的节点,转移到同步队列中。
!node.compareAndSetWaitStatus(Node.CONDITION, 0)
:如果发生了中断等情况,CAS
失败,继续while
条件的后续判断,判断的是条件队列的下一个节点是否能唤醒。
如果CAS
成功,则enq(node)
将队列放入到同步队列中,返回的对象是入队前的队尾节点。即当前node
变量的prev
前驱节点。
如果前驱节点已经取消了或者CAS
置为SIGNAL
失败后,直接恢复线程的运行。
然后返回true
,取反,结束while
循环,唤醒流程结束。
注意
ReentrantReadWriteLock
中的读锁不能使用newCondition()
方法,直接抛出了异常。(写锁可使用)
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb/LiteNote扫描下方二维码关注公众号和小程序↓↓↓