Java 并发编程之 AQS ReentrantLock 非公平锁源码解析

Java juc AQS About 6,141 words

说明

本文基于Java11

部分源码

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

private Node addWaiter(Node mode) {
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            // 将新节点的 prev 字段设置为老的 node 节点
            node.setPrevRelaxed(oldTail);
            // 将 tail 字段设置为 node 
            if (compareAndSetTail(oldTail, node)) {
                // CAS 设置成功后,将老的 node 节点的 next 字段设置为新的节点
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

private final void initializeSyncQueue() {
    Node h;
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}

private final boolean compareAndSetTail(Node expect, Node update) {
    return TAIL.compareAndSet(this, expect, update);
}


/*
     +------+  prev +-----+       +-----+
head |      | <---- |     | <---- |     |  tail
     +------+       +-----+       +-----+
*/
static final class Node {
    static final Node EXCLUSIVE = null;
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;

    final void setPrevRelaxed(Node p) {
        // Unsafe/VarHandle set prev
        PREV.set(this, p);
    }
}

tryAcquire()尝试获取锁,返回了true说明上锁成功,失败后进行acquireQueued入队操作。

入队操作前先进行addWaiter初始化等待节点。

ReentrantLock中的非公平锁的实现。

// java.util.concurrent.locks.ReentrantLock.Sync
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

getState()获取当前锁的状态赋值给变量c

如果c0表示没有人上锁使用CAS修改状态值竞争锁并且设置独占锁线程为当前线程,返回trueCAS失败返回false准备入等待队列。

如果c不为0判断是否是当前线程持有了锁,如果是,则锁重入,设置新的状态值,ReentrantLock最大的锁重入次数是Integer的最大值。

加入等待队列时head节点为哨兵节点,哨兵节点的next字段指向需要入队的新节点,tail节点指向入队的新节点。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

正式加入队列前,还会将节点的前一个节点的waitStatus0改为-1

ReentrantLocklock()方法为不可能打断模式,外部调用Threadinterrupte()方法,只是会再次进行acquireQueued抢夺锁,但基本都是再次进入LockSupportpark()方法。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放锁时,将Syncstate改为0(暂时不考虑重入的情况),将头节点的waitStatue改为0,并unpark头节点的下一个节点。

当释放锁后,acquireQueued中原先停在了parkAndCheckInterrupt代码,被unpark后继续执行,for循环中已经是前驱节点是头节点了,在tryAcquire抢得锁后,将当前节点设置为头节点,意味着上锁成功,继续执行锁中的代码。

Views: 2,252 · Posted: 2021-10-04

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb/LiteNote

扫描下方二维码关注公众号和小程序↓↓↓

扫描下方二维码关注公众号和小程序↓↓↓


Today On History
Browsing Refresh