public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final
int NEW =
0;
private static final
int COMPLETING =
1;
private static final
int NORMAL =
2;
private static final
int EXCEPTIONAL =
3;
private static final
int CANCELLED =
4;
private static final
int INTERRUPTING =
5;
private static final
int INTERRUPTED =
6;
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
@SuppressWarnings(
"unchecked")
private V
report(
int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
public FutureTask(Callable<V> callable) {
if (callable ==
null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public boolean
isCancelled() {
return state >= CANCELLED;
}
public boolean
isDone() {
return state != NEW;
}
public boolean
cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(
this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t !=
null)
t.interrupt();
}
finally {
UNSAFE.putOrderedInt(
this, stateOffset, INTERRUPTED);
}
}
}
finally {
finishCompletion();
}
return true;
}
public V
get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(
false,
0L);
return report(s);
}
public V
get(
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit ==
null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(
true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
protected void done() { }
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(
this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(
this, stateOffset, NORMAL);
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(
this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(
this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(
this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c !=
null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran =
true;
}
catch (Throwable ex) {
result =
null;
ran =
false;
setException(ex);
}
if (ran)
set(result);
}
}
finally {
runner =
null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected boolean
runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(
this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran =
false;
int s = state;
try {
Callable<V> c = callable;
if (c !=
null && s == NEW) {
try {
c.call();
ran =
true;
}
catch (Throwable ex) {
setException(ex);
}
}
}
finally {
runner =
null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
private void handlePossibleCancellationInterrupt(
int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.
yield();
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
private void finishCompletion() {
for (WaitNode q; (q = waiters) !=
null;) {
if (UNSAFE.compareAndSwapObject(
this, waitersOffset, q,
null)) {
for (;;) {
Thread t = q.thread;
if (t !=
null) {
q.thread =
null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next ==
null)
break;
q.next =
null;
q = next;
}
break;
}
}
done();
callable =
null;
}
private int awaitDone(boolean timed,
long nanos)
throws InterruptedException {
final
long deadline = timed ? System.nanoTime() + nanos :
0L;
WaitNode q =
null;
boolean queued =
false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q !=
null)
q.thread =
null;
return s;
}
else if (s == COMPLETING)
Thread.
yield();
else if (q ==
null)
q =
new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(
this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <=
0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(
this, nanos);
}
else
LockSupport.park(
this);
}
}
private void removeWaiter(WaitNode node) {
if (node !=
null) {
node.thread =
null;
retry:
for (;;) {
for (WaitNode pred =
null, q = waiters, s; q !=
null; q = s) {
s = q.next;
if (q.thread !=
null)
pred = q;
else if (pred !=
null) {
pred.next = s;
if (pred.thread ==
null)
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(
this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
}