Java 并发编程之 LongAdder 源码解析
Java juc About 9,061 words基本用法
public class LongAdderDemo {
public static void main(String[] args) {
LongAdder adder = new LongAdder();
adder.increment();
System.out.println(adder.sum());
}
}
源码解析
cells
和base
是LongAdder
的父类Striped64
中的原子成员变量。
cells 未初始化情况
// java.util.concurrent.atomic.LongAdder
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
(cs = cells) != null
:将cells
赋值给cs
局部变量并判断是否为空。首次进行add
操作cells
未初始化,为空,进行下一个判断。
!casBase(b = base, b + x)
:将base
赋值给b
局部变量(base
是long
类型的成员变量初始值为0
),与b+x
进行cas
比较并交换,如果成功则add
方法执行结束,没有交换成功则进入if
方法体。
cs == null
:首次add
肯定为空,进入if
方法体。
longAccumulate
:执行Striped64
中的长整形累加方法。
// java.util.concurrent.atomic.Striped64
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// ...
}
static final int getProbe() {
return (int) THREAD_PROBE.get(Thread.currentThread());
}
private static final VarHandle THREAD_PROBE;
static {
THREAD_PROBE = l.findVarHandle(Thread.class, "threadLocalRandomProbe", int.class);
}
// java.lang.Thread
@jdk.internal.vm.annotation.Contended("tlr")
int threadLocalRandomProbe;
getProbe()
:获取当前线程的Probe
哈希值,如果等于0
,则用ThreadLocalRandom.current()
方法初始化,并再次获取Probe
的值赋值给h
。
// java.util.concurrent.atomic.Striped64
transient volatile int cellsBusy;
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
// ...
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
if ((cs = cells) != null && (n = cs.length) > 0) {//...}
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
cellsBusy = 0;
}
}
// Fall back on using base
else if (casBase(v = base, (fn == null) ? v + x : fn.applyAsLong(v, x)))
break done;
}
}
final boolean casCellsBusy() {
return CELLSBUSY.compareAndSet(this, 0, 1);
}
第一次进入longAccumulate
方法时,cells
还没创建,所以不走第一个if
判断。
cellsBusy
:CAS
自旋锁的标志位,用于Cells
数组创建或扩容。初始为0
。
cells == cs
:在第一个if
判断时已经赋值,此处再次判断是为了防止其他线程已经初始化完成了。
casCellsBusy()
:自旋锁上锁,成功就进行cells
数组初始化。
cells == cs
:再次判断是否已经被初始化了,防止在CAS
期间有改动。
rs[h & 1] = new Cell(x)
:默认创建的cells
数组长度为2
,第一个Cell
元素放置在线程的Probe
值&1
的位置(&1
等于%2
,对2
取模)
finally
中解锁。
casBase(v = base, (fn == null)
:如果cellsBusy
已经被抢占或者cells == cs
被改动或者casCellsBusy()
抢占失败则进行对base
成员变量的cas
赋值,失败则再次循环,成功则方法longAccumulate
执行结束(意味着LongAdder
中的add
方法也执行结束)
cells 初始化完成情况
// java.util.concurrent.atomic.LongAdder
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
((cs = cells) != null
:cells
数据已经初始化完毕了,所以每次都会进入if
语句体。
(m = cs.length - 1) < 0
:初始化的cells
数组长度为2
,此时m=1
,所以此条件也不成立。
(c = cs[getProbe() & m]) == null
:判断Probe&m
索引上的Cell
是否为空,为空则进行longAccumulate
。(此处m=1
等于判断0
或1
索引的Cell
)
!(uncontended = c.cas(v = c.value, v + x))
:如果上一步取到的Cell
对象不为空,则在Cell
上进行CAS
赋值,赋值失败则将uncontended
置为false
,并进行longAccumulate
。
// java.util.concurrent.atomic.Striped64
transient volatile int cellsBusy;
static final int NCPU = Runtime.getRuntime().availableProcessors();
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
// ...
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
if ((cs = cells) != null && (n = cs.length) > 0) {
if ((c = cs[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
break done;
}
} finally {
cellsBusy = 0;
}
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// ...
}
collide
:为真表示最后一个槽位都不为空了,即数组所有索引位置上的元素都不为空了。
(c = cs[(n - 1) & h]) == null
:此时n
等于数组长度等于2
,同样是取0
或1
索引位置的Cell
元素,如果为空,则上自旋锁,new
出一个Cell
对象赋值到该索引上。
!wasUncontended
:LongAdder
中进行的CAS
失败了的所以contended
是false
,改为true
并且调用advanceProbe
更换计算索引位置的h
进行下一次循环。
c.cas(v = c.value...
:对索引位置上的Cell
元素进行CAS
赋值,成功则方法整体运行完成,失败则进入下一个if
判断。
n >= NCPU || cells != cs
:数组的长度超出了CPU
个数,或者发生了扩容,则将collide
该为false
。调用advanceProbe
更换计算索引位置的h
进行下一次循环。
!collide
:上述条件都不成立(已经循环了几次了),则说明数组位置都有元素了,将标志位置为真。调用advanceProbe
更换计算索引位置的h
进行下一次循环。
cellsBusy == 0 && casCellsBusy()
:上自旋锁,对Cell
数组进行扩容,每次扩容的大小为2
的n
次方。扩容完成将collide
置为false
,继续循环。
sum 方法
sum
计算的是当前cells
和base
的快照的总和,在并发环境下调用此方法,等到的值不一定准确。改方法适合在所有竞争线程都执行完成后的获取总数。
// java.util.concurrent.atomic.LongAdder
public long sum() {
Cell[] cs = cells;
long sum = base;
if (cs != null) {
for (Cell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}
高并发场景
在高并发场景下,Cell
数组已经扩容到CPU
个数的长度后。更多的执行的情况如下,删去了不会执行到的代码和永远不成立的代码。
// java.util.concurrent.atomic.LongAdder
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null) {
boolean uncontended = true;
// 随机获取一个 Cell 并赋值给 c ,c 肯定不为空,进行 || 运算
// 对 c 进行 CAS 赋值,失败进入 longAccumulate,uncontended = false
if ((c = cs[getProbe() & m]) == null || !(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
// java.util.concurrent.atomic.Striped64
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h; // 随机数,取索引用
// 高并发场景下该字段已经没有用处
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
// 数组一定不为空,所以一直进这个 if 条件
if ((cs = cells) != null && (n = cs.length) > 0) {
// 随机获取一个 Cell 并赋值给 c ,c 肯定不为空,进入 else if 判断
if ((c = cs[(n - 1) & h]) == null) {}
// 上面 LongAdder 的 add 方法中执行中带入的 uncontended = false,取反,赋值为 true
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 再次 CAS 赋值 cell 的值,成功就退出循环,结束方法
else if (c.cas(v = c.value, (fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
// 上面 CAS 不成功后,每次都会进入此 if 条件中
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
// 每次进入 if 条件没有 break 掉的都要执行更换随机数的操作
h = advanceProbe(h);
}
}
}
备注
LongAdder
的reset
和sumThenReset
方法都不应该用在并发场景下。
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb/LiteNote扫描下方二维码关注公众号和小程序↓↓↓