该文章主要讲解在向线程池提交任务时, 返回的Future的源码分析.
Future 简介 一个Future代表一个异步计算的结果, 提供了检查计算是否完成, 等待其完成以及检索计算结果的方法.
在向ThreadPoolExecutor中提交任务时, 会返回一个Future.
接口方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public interface Future <V> { boolean cancel (boolean mayInterruptIfRunning) ; boolean isCancelled () ; boolean isDone () ; V get () throws InterruptedException, ExecutionException; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
FutureTask 来看看Future的实现类FutureTask
, 该类实现了 RunnableFuture
,而 RunnableFuture 继承了 Future 了.
简介 FutureTask提供了对Future的基础实现. 一个可取消的异步计算. 且可以使用FutureTask包装Runnable和Callable.
状态变量 1 2 3 4 5 6 7 8 9 10 11 12 13 public class FutureTask <V> implements RunnableFuture <V> { private volatile int state; private static final int NEW = 0 ; private static final int COMPLETING = 1 ; private static final int NORMAL = 2 ; private static final int EXCEPTIONAL = 3 ; private static final int CANCELLED = 4 ; private static final int INTERRUPTING = 5 ; private static final int INTERRUPTED = 6 ; }
使用state来控制FutureTask的状态变化, 可行的状态变化
1 2 3 4 NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED
核心变量 1 2 3 4 5 6 7 8 9 10 public class FutureTask <V> implements RunnableFuture <V> { private Callable<V> callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters; }
构造函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class FutureTask <V> implements RunnableFuture <V> { public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException (); this .callable = callable; this .state = NEW; } public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; } }
核心方法 由于 FutureTask 实现了 Runnable 接口, 所以优先查看run
方法
run方法原理 run方法继承自Runnable, 当FutureTask执行时会调用其run方法.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }protected void setException (Throwable t) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this , stateOffset, EXCEPTIONAL); finishCompletion(); } }protected void set (V v) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this , stateOffset, NORMAL); finishCompletion(); } }
在setException
和set
方法中唯一不同的就是修改的状态不同. 大致如下:
setException
状态变化: NEW -> COMPLETING -> EXCEPTIONAL
set状态
变化: NEW -> COMPLETING -> NORMAL
finishCompletion方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void finishCompletion () { for (WaitNode q; (q = waiters) != null ;) { if (UNSAFE.compareAndSwapObject(this , waitersOffset, q, null )) { for (;;) { Thread t = q.thread; if (t != null ) { q.thread = null ; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null ) break ; q.next = null ; q = next; } break ; } } done(); callable = null ; }
handlePossibleCancellationInterrupt方法原理 该方法只有当FutureTask的状态为INTERRUPTING
或INTERRUPTED
才会被调用.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void handlePossibleCancellationInterrupt (int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); }
注意:
只有当调用cancel(true)方法并传递true时, FutureTask的状态才会被更改为INTERRUPTING.
调用此方法的场景:
FutureTask还未被执行, 调用了cancel方法, 当再次执行run方法时则无法执行.
FutureTask正在被执行, 调用了cancel方法.
如果执行此Future的线程阻塞, 则会响应中断异常, 此方法会被调用
如果执行此Future的线程未阻塞, 此方法不会被调用
cancel方法原理 该方法用于取消FutureTask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public boolean cancel (boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this , stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false ; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null ) t.interrupt(); } finally { UNSAFE.putOrderedInt(this , stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true ; }
注意:
该方法只有mayInterruptIfRunning
参数为true时,才会设置执行线程的中断标志位, 这就意味着如果执行该FutureTask的线程被阻塞, 就会响应中断. 反之如果不阻塞,则不会响应中断. FutureTask正常执行.
get方法原理 此方法用于获取FutureTask
异步计算的结果, 当FutureTask未计算完成时, 此方法会阻塞调用线程.
1 2 3 4 5 6 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); }
awaitDone方法原理 该方法用于等待任务完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private int awaitDone (boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L ; WaitNode q = null ; boolean queued = false ; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException (); } int s = state; if (s > COMPLETING) { if (q != null ) q.thread = null ; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null ) q = new WaitNode (); else if (!queued) queued = UNSAFE.compareAndSwapObject(this , waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L ) { removeWaiter(q); return state; } LockSupport.parkNanos(this , nanos); } else LockSupport.park(this ); } }
总结 FutureTask由于实现了Runnable接口, 让其自身具备了被执行的能力. 且本身缓存了执行当前FutureTask的线程. 同时又实现了Future接口, 具备了操作异步计算的能力.
其通过一个state变量来控制FutureTask的状态变化.