Java 并发编程之 AQS ReentrantReadWriteLock 读写锁源码解析

Java juc AQS About 5,446 words

Read Lock

public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    /** Returns the number of shared holds represented in count. */
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** Returns the number of exclusive holds represented in count. */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

}

写锁占state16位,读锁占state16

sharedCount:返回读锁的数量

exclusiveCount:返回写锁的数量

// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock
public void lock() {
    sync.acquireShared(1);
}

@ReservedStackAccess
protected final int tryAcquireShared(int unused) {

    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 省略了计数
        return 1;
    }
    return fullTryAcquireShared(current);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared:如果写锁数量不等于0且持有锁的线程不是当前线程,则返回-1进行入队等待。如果读锁的数量未超出最大数量且CAS累加成功(暂不考虑读锁阻塞)返回1加锁成功继续执行代码。

读写锁就两种情况:-1写锁占用着,读锁返回1

// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

如果tryAcquireShared返回-1则进入队列,添加节点为共享模式,同ReentrantLock等其他AQS类一样,将头节点的waitStatus改为-1表示唤醒时会unpark头节点的next节点的线程。

然后parkAndCheckInterrupt将线程暂停。

setHeadAndPropagate:在线程恢复运行后,还会去检查下一个节点是不是共享模式,如果是则将下一个节点也同样恢复运行。

Read Unlock

// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock
public void unlock() {
    sync.releaseShared(1);
}

@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
    // 省略了更改计数
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

tryReleaseShared:将state减去1并判断是否等于0

doReleaseShared:恢复后继节点线程的运行。

Write Lock

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock
public void lock() {
    sync.acquire(1);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

写锁的加锁流程与ReentrantLock的加锁流程一致(排它锁)。不再重复。

Write Unlock

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock
public void unlock() {
    sync.release(1);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

写锁的解锁流程同样与ReentrantLock的解锁流程一致。

Views: 1,653 · Posted: 2021-10-08

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb/LiteNote

扫描下方二维码关注公众号和小程序↓↓↓

扫描下方二维码关注公众号和小程序↓↓↓


Today On History
Browsing Refresh