Java FutureTask 源码解析
Java 面试 juc About 6,354 words说明
本文基于Java8
。
构造方法
传入Callable
对象,使用成员变量接收,并将状态改为NEW
。
private Callable<V> callable;
private volatile int state;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // 确保让 callable 可见
}
state 状态
state
可能的几种过渡状态:
NEW
->COMPLETING
->NORMAL
NEW
->COMPLETING
->EXCEPTIONAL
NEW
->CANCELLED
NEW
->INTERRUPTING
->INTERRUPTED
/**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
run 方法
如果state
不为NEW
,或者CAS
赋值runner
不成功就直接return
。
调用callable
的call()
方法,正常运行则result
赋值为call()
方法的返回值、标识位ran
置为true
、用CAS
先将state
从NEW
设置为COMPLETING
,再将result
赋值给outcome
(outcome
成员变量是get()
方法调用时返回的结果),
若出现异常则将result
置为null
、标识位ran
置为false
、用CAS
先将state
从NEW
设置为COMPLETING
,再将异常结果赋值给outcome
,最后将state
设置为EXCEPTIONAL
。
finishCompletion()
方法再解析完get()
方法后再做分析。
// 运行 callable 的线程;在 run() 方法调用时使用 CAS 赋值
private volatile Thread runner;
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// get() 方法调用时返回的结果
private Object outcome;// 由 state 维护
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
private static final long runnerOffset;
// 省略了部分代码
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
get() 方法
如果state
是NEW
或者COMPLETING
,则执行awaitDone()
方法并接收返回值state
。解除等待后执行report()
方法。
awaitDone() 方法
无限循环中有如下步骤:
- 判断线程是否被中断,中断则移出等待节点并抛出异常。
- 如果当前
state
大于COMPLETING
(即:state
可能已经完成,异常,取消,中断),将等待节点中的thread
引用置空,进行下一次循环。 - 如果当前
state
等于COMPLETING
(正在完成中),将线程从运行状态变为就绪状态,进行下一次循环。 - 如果等待节点为空,
new
一个WaitNode
,进行下一次循环。 - 如果
queued
没有排队的,将第4
步中new
的等待节点,从头步插入waiters
等待节点链表中,插入成功将queued
置为true
,表示有排队的节点了,将进行下一次循环。 - 判断是否设置了超时等待,进行下一次循环。
- 都没有命中
if
条件,则将当前线程阻塞。
awaitDone()
方法正常结束无限循环的条件只能是,unpark()
取消阻塞后state
大于COMPLETING
(正常完成中),即awaitDone()
方法正常返回的state
只能是:NORMAL
、EXCEPTIONAL
、CANCELLED
。
report() 方法
如果state
等于NORMAL
,则将outcome
成员变量的结果返回回去,如果state
大于等于CANCELLED
,则抛出CancellationException
取消异常,其他state
抛出ExecutionException
执行异常。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
finishCompletion() 方法
将等待节点waiters
中的元素遍历删除,并且将waiters
中park()
的线程都unpark()
释放。done()
方法为空实现,不做任何事情。将callable
置空。
private volatile WaitNode waiters;
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
protected void done() { }
private static final long waitersOffset;
// 省略了部分代码
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb/LiteNote扫描下方二维码关注公众号和小程序↓↓↓