Java 并发编程之 LongAdder 源码解析

Java juc About 9,061 words

基本用法

public class LongAdderDemo {

    public static void main(String[] args) {
        LongAdder adder = new LongAdder();
        adder.increment();
        System.out.println(adder.sum());
    }

}

源码解析

cellsbaseLongAdder的父类Striped64中的原子成员变量。

cells 未初始化情况

// java.util.concurrent.atomic.LongAdder
public void increment() {
    add(1L);
}

public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[getProbe() & m]) == null ||
            !(uncontended = c.cas(v = c.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

(cs = cells) != null:将cells赋值给cs局部变量并判断是否为空。首次进行add操作cells未初始化,为空,进行下一个判断。

!casBase(b = base, b + x):将base赋值给b局部变量(baselong类型的成员变量初始值为0),与b+x进行cas比较并交换,如果成功则add方法执行结束,没有交换成功则进入if方法体。

cs == null:首次add肯定为空,进入if方法体。

longAccumulate:执行Striped64中的长整形累加方法。

// java.util.concurrent.atomic.Striped64
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }

    // ...
}

static final int getProbe() {
    return (int) THREAD_PROBE.get(Thread.currentThread());
}

private static final VarHandle THREAD_PROBE;

static {
    THREAD_PROBE = l.findVarHandle(Thread.class, "threadLocalRandomProbe", int.class);
}

// java.lang.Thread
@jdk.internal.vm.annotation.Contended("tlr")
int threadLocalRandomProbe;

getProbe():获取当前线程的Probe哈希值,如果等于0,则用ThreadLocalRandom.current()方法初始化,并再次获取Probe的值赋值给h

// java.util.concurrent.atomic.Striped64
transient volatile int cellsBusy;

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    // ...
    boolean collide = false;                // True if last slot nonempty
    done: for (;;) {
        Cell[] cs; Cell c; int n; long v;
        if ((cs = cells) != null && (n = cs.length) > 0) {//...}
        else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
            try {                           // Initialize table
                if (cells == cs) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    break done;
                }
            } finally {
                cellsBusy = 0;
            }
        }
        // Fall back on using base
        else if (casBase(v = base, (fn == null) ? v + x : fn.applyAsLong(v, x)))
            break done;        
    }
}

final boolean casCellsBusy() {
    return CELLSBUSY.compareAndSet(this, 0, 1);
}

第一次进入longAccumulate方法时,cells还没创建,所以不走第一个if判断。

cellsBusyCAS自旋锁的标志位,用于Cells数组创建或扩容。初始为0

cells == cs:在第一个if判断时已经赋值,此处再次判断是为了防止其他线程已经初始化完成了。

casCellsBusy():自旋锁上锁,成功就进行cells数组初始化。

cells == cs:再次判断是否已经被初始化了,防止在CAS期间有改动。

rs[h & 1] = new Cell(x):默认创建的cells数组长度为2,第一个Cell元素放置在线程的Probe&1的位置(&1等于%2,对2取模)

finally中解锁。

casBase(v = base, (fn == null):如果cellsBusy已经被抢占或者cells == cs被改动或者casCellsBusy()抢占失败则进行对base成员变量的cas赋值,失败则再次循环,成功则方法longAccumulate执行结束(意味着LongAdder中的add方法也执行结束)

cells 初始化完成情况

// java.util.concurrent.atomic.LongAdder
public void increment() {
    add(1L);
}

public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[getProbe() & m]) == null ||
            !(uncontended = c.cas(v = c.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

((cs = cells) != nullcells数据已经初始化完毕了,所以每次都会进入if语句体。

(m = cs.length - 1) < 0:初始化的cells数组长度为2,此时m=1,所以此条件也不成立。

(c = cs[getProbe() & m]) == null:判断Probe&m索引上的Cell是否为空,为空则进行longAccumulate。(此处m=1等于判断01索引的Cell

!(uncontended = c.cas(v = c.value, v + x)):如果上一步取到的Cell对象不为空,则在Cell上进行CAS赋值,赋值失败则将uncontended置为false,并进行longAccumulate

// java.util.concurrent.atomic.Striped64
transient volatile int cellsBusy;

static final int NCPU = Runtime.getRuntime().availableProcessors();

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    // ...
    boolean collide = false;                // True if last slot nonempty
    done: for (;;) {
        Cell[] cs; Cell c; int n; long v;
        if ((cs = cells) != null && (n = cs.length) > 0) {
            if ((c = cs[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                break done;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (c.cas(v = c.value,
                           (fn == null) ? v + x : fn.applyAsLong(v, x)))
                break;
            else if (n >= NCPU || cells != cs)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == cs)        // Expand table unless stale
                        cells = Arrays.copyOf(cs, n << 1);
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        // ...
}

collide:为真表示最后一个槽位都不为空了,即数组所有索引位置上的元素都不为空了。

(c = cs[(n - 1) & h]) == null:此时n等于数组长度等于2,同样是取01索引位置的Cell元素,如果为空,则上自旋锁,new出一个Cell对象赋值到该索引上。

!wasUncontendedLongAdder中进行的CAS失败了的所以contendedfalse,改为true并且调用advanceProbe更换计算索引位置的h进行下一次循环。

c.cas(v = c.value...:对索引位置上的Cell元素进行CAS赋值,成功则方法整体运行完成,失败则进入下一个if判断。

n >= NCPU || cells != cs:数组的长度超出了CPU个数,或者发生了扩容,则将collide该为false。调用advanceProbe更换计算索引位置的h进行下一次循环。

!collide:上述条件都不成立(已经循环了几次了),则说明数组位置都有元素了,将标志位置为真。调用advanceProbe更换计算索引位置的h进行下一次循环。

cellsBusy == 0 && casCellsBusy():上自旋锁,对Cell数组进行扩容,每次扩容的大小为2n次方。扩容完成将collide置为false,继续循环。

sum 方法

sum计算的是当前cellsbase的快照的总和,在并发环境下调用此方法,等到的值不一定准确。改方法适合在所有竞争线程都执行完成后的获取总数。

// java.util.concurrent.atomic.LongAdder
public long sum() {
    Cell[] cs = cells;
    long sum = base;
    if (cs != null) {
        for (Cell c : cs)
            if (c != null)
                sum += c.value;
    }
    return sum;
}

高并发场景

在高并发场景下,Cell数组已经扩容到CPU个数的长度后。更多的执行的情况如下,删去了不会执行到的代码和永远不成立的代码。

// java.util.concurrent.atomic.LongAdder
public void increment() {
    add(1L);
}

public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    if ((cs = cells) != null) {
        boolean uncontended = true;
        // 随机获取一个 Cell 并赋值给 c ,c 肯定不为空,进行 || 运算
        // 对 c 进行 CAS 赋值,失败进入 longAccumulate,uncontended = false
        if ((c = cs[getProbe() & m]) == null || !(uncontended = c.cas(v = c.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

// java.util.concurrent.atomic.Striped64
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h; // 随机数,取索引用

    // 高并发场景下该字段已经没有用处
    boolean collide = false;                // True if last slot nonempty
    done: for (;;) {
        Cell[] cs; Cell c; int n; long v;
        // 数组一定不为空,所以一直进这个 if 条件
        if ((cs = cells) != null && (n = cs.length) > 0) {
            // 随机获取一个 Cell 并赋值给 c ,c 肯定不为空,进入 else if 判断
            if ((c = cs[(n - 1) & h]) == null) {}
            // 上面 LongAdder 的 add 方法中执行中带入的 uncontended = false,取反,赋值为 true
            else if (!wasUncontended)        // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 再次 CAS 赋值 cell 的值,成功就退出循环,结束方法
            else if (c.cas(v = c.value, (fn == null) ? v + x : fn.applyAsLong(v, x)))
                break;
            // 上面 CAS 不成功后,每次都会进入此 if 条件中
            else if (n >= NCPU || cells != cs)
                collide = false;            // At max size or stale

            // 每次进入 if 条件没有 break 掉的都要执行更换随机数的操作
            h = advanceProbe(h);
        }
    }
}

备注

LongAdderresetsumThenReset方法都不应该用在并发场景下。

Views: 2,079 · Posted: 2021-09-26

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb/LiteNote

扫描下方二维码关注公众号和小程序↓↓↓

扫描下方二维码关注公众号和小程序↓↓↓


Today On History
Browsing Refresh