该文章主要讲解CompletableFuture的设计以及源码分析
CompletableFuture源码分析
本文主要讲解CompletableFuture的设计以及源码分析
CompletionStage 一个CompletionStage
代表了一个可异步计算的阶段
. 在另一个CompletionStage
完成时执行一个操作或者计算一个值.
一个CompletionStage
的执行可以由另一个单独阶段完成后触发, 也可以由两个阶段都完成触发, 或者两个阶段中任意一个完成后触发.
CompletableFuture整体设计
简易版
CompletableFuture
一个CompletableFuture实现了Future
和CompletionStage
两个接口.
所以每个CompletableFuture都是一个阶段
核心变量
1 2 3 4 volatile Object result;volatile Completion stack;
Completion 一个Completion是一个特殊的FutureTask, 可以由线程池提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 abstract static class Completion extends ForkJoinTask <Void> implements Runnable , AsynchronousCompletionTask { volatile Completion next; abstract CompletableFuture<?> tryFire(int mode); abstract boolean isLive () ; public final void run () { tryFire(ASYNC); } public final boolean exec () { tryFire(ASYNC); return true ; } }
UniCompletion 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 abstract static class UniCompletion <T,V> extends Completion { Executor executor; CompletableFuture<V> dep; CompletableFuture<T> src; UniCompletion(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src) { this .executor = executor; this .dep = dep; this .src = src; } final boolean claim () { Executor e = executor; if (compareAndSetForkJoinTaskTag((short )0 , (short )1 )) { if (e == null ) return true ; executor = null ; e.execute(this ); } return false ; } final boolean isLive () { return dep != null ; } }
UniCompletion子类如图:
除去AsyncSupply
和AsyncRun
两个类其他类都是UniCompletion的子类.
测试代码 1 2 3 4 5 6 7 public static void main (String[] args) throws Exception { Integer result = CompletableFuture .supplyAsync(() -> 10 ) .thenApplyAsync(res -> res + 1 ) .thenApplyAsync(res -> res + 2 ).get(); System.out.println(result); }
此代码等效于
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws Exception { CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> 10 ); CompletableFuture<Integer> cf2 = cf1.thenApplyAsync(res -> res + 1 ); CompletableFuture<Integer> cf3 = cf2.thenApplyAsync(res -> res + 2 ); System.out.println(cf3.get()); }
静态方法 在CompletableFuture中可以使用一些静态方法来快捷的创建CompletableFuture.
supplyAsync
方法
1 2 3 4 5 6 7 8 9 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); }public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
runAsync
方法
1 2 3 4 5 6 public static CompletableFuture<Void> runAsync (Runnable runnable) { return asyncRunStage(asyncPool, runnable); }public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
这两个方法的区别就是: runAsync
返回的CompletableFuture没有结果, 即get
返回null. 而supplyAsync
方法则由返回值.
asyncSupplyStage方法原理 1 2 3 4 5 6 7 8 9 10 static <U> CompletableFuture<U> asyncSupplyStage (Executor e, Supplier<U> f) { if (f == null ) throw new NullPointerException (); CompletableFuture<U> d = new CompletableFuture <U>(); e.execute(new AsyncSupply <U>(d, f)); return d; }
asyncRunStage方法原理 1 2 3 4 5 6 7 8 static CompletableFuture<Void> asyncRunStage (Executor e, Runnable f) { if (f == null ) throw new NullPointerException (); CompletableFuture<Void> d = new CompletableFuture <Void>(); e.execute(new AsyncRun (d, f)); return d; }
通过对两个方法的解析, 可以看看出代码的流程是一致的, 只是最终想线程池中提交的任务类型不一样.
asyncSupplyStage ==> AsyncSupply
而 asyncRunStage ==> AsyncRun
.
查看AsyncSupply和AsyncRun两个内部类的run
方法.
AsyncSupply.run
代码如下
AsyncRun.run
代码如下
通过代码可以看出AsyncRun和AsyncSupply两个类的区别:
都存在fn
字段, 用于存储当前阶段执行的动作, 但是一个是Supplier类, 一个是Runnable类型.
两个类的run方法在最后都调用了CompletableFuture.postComplete()
方法.
在supplyAsync()
或runAsync()
执行完后会返回一个CF
. 在其内部将需要执行的操作封装成了AsyncSupply
或``AsyncRun并与
代表当前阶段的CF进行关联. 此
AsyncSupply和
AsyncRun都是一个
FutureTask`可以被线程池执行.
执行流程大致为:
将执行的动作(Supplier
或Runnable
)和对应的CF
通过AsyncSupply
或AsyncRun
进行封装
将AsyncSupply
或AsyncRun
并扔到线程池中执行.
而AsyncSupply
或AsyncRun
通过判断CF.result==null
是否完成. 为空执行动作,反之则设置结果.
最终调用postComplete
方法唤醒后续依赖的CF.
实例方法 thenApplyAsync方法原理 1 2 3 4 5 public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); }
uniApplyStage方法原理 此方法每次调用都会返回一个的CF, 但是每次调用的对象是不同的.
执行step2
时是调用的CF1.uniApplyStage
, 而在执行step3
时是调用的CF2.uniApplyStage
.
所以我们分析此段源码时 需要注意this所指向的对象
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private <V> CompletableFuture<V> uniApplyStage (Executor e, Function<? super T,? extends V> action) { if (action == null ) throw new NullPointerException (); CompletableFuture<V> cf = new CompletableFuture <V>(); if (e != null || !cf.uniApply(this , action, null )) { UniApply<T,V> c = new UniApply <T,V>(e, cf, this , action); push(c); c.tryFire(SYNC); } return cf; }
看完这段代码可能非常疑惑.
uniApply干了什么?
push方法做了什么?
tryFire干了什么?
UniApply是用于干什么的?
但是大致流程是:
创建了一个UniApply
对象绑定了新CF(cf对象)
和当前CF(this)
同时还有新CF执行的动作(action参数)
此步骤体现: new UniApply<T,V>(e, cf, this, action);
然后将UniApply
对象推送到当前CF
的某个地方
最后尝试触发UniApply
对象.
uniApply方法原理 step2执行时调用此方法的CF对象为: CF2
. 所以参数a表示的是CF1.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 final <S> boolean uniApply (CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c) { Object r; Throwable x; if (a == null || (r = a.result) == null || f == null ) return false ; tryComplete: if (result == null ) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null ) { completeThrowable(x, r); break tryComplete; } r = null ; } try { if (c != null && !c.claim()) return false ; @SuppressWarnings("unchecked") S s = (S) r; completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); } } return true ; }
可以看出此方法就是想尝试的执行一下当前CF的动作.
如果依赖的上一个CF还未执行完或出现异常, 则当前CF的动作不会执行, 会直接返回.
push/tryPushStack方法原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final void push (UniCompletion<?,?> c) { if (c != null ) { while (result == null && !tryPushStack(c)) lazySetNext(c, null ); } }final boolean tryPushStack (Completion c) { Completion h = stack; lazySetNext(c, h); return UNSAFE.compareAndSwapObject(this , STACK, h, c); }
简而言之push方法就是将传递的Completion压入当前CF的stack变量的顶部.
当上述测试代码执行完, CF的执行结构如下图:
唤醒操作 唤醒操作由大致有两类: 一类是CompletableFuture的唤醒, 一类是Completion的唤醒
通过整体的代码分析, 得知了触发后续CF的方法目前有两个: tryFire
和postComplete
.
postComplete方法原理 该方法属于CompletableFuture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 final void postComplete () { CompletableFuture<?> src = this ; Completion completion; while ((completion = src.stack) != null || (src != this && (completion = (src = this ).stack) != null )) { CompletableFuture<?> dep; Completion nextCompletion; if (src.casStack(completion, nextCompletion = completion.next)) { if (nextCompletion != null ) { if (src != this ) { pushStack(completion); continue ; } completion.next = null ; } src = (dep = completion.tryFire(NESTED)) == null ? this : dep; } } }
使用上面的测试代码分析流程:
第一次循环: src = CF1. while循环第一个条件满足, 此时将CF1的栈顶Completion出栈. 由于栈中只有一个Completion所以nextCompletion != null
不成立.
通过Completion的tryFire方法唤醒与此绑定的后续阶段(CF2). 然后dep!=null, 返回dep, 此时src = CF2
第二次循环: src = CF2, while循环第一个条件满足, 此时将CF2的栈顶Completion出栈, 同样的nextCompletion != null
不成立.
通过Completion的tryFire方法唤醒与此绑定的后续阶段(CF3). 然后dep!=null, 返回dep, 此时src = CF3
第三次循环: src=CF3, while循环第一个条件不满足(CF3没有发生thenXXX调用, 所以它的栈是空的), 此时会走while循环第二个条件, 此时src被重新切回到了CF1. 但是CF1的栈也只有一个, 所以while循环终止(第一次循环时唯一的Completion出栈了).
注意这三次循环都是在CF1.postCompele方法中完成的.
当第二次循环后, CF2开始执行.
tryFire方法原理 此方法属于Completion, 用于触发依赖此Completion的阶段(dep字段即代表下个阶段的CF).
1 2 3 4 5 6 7 8 9 10 11 12 13 final CompletableFuture<V> tryFire (int mode) { CompletableFuture<V> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this )) return null ; dep = null ; src = null ; fn = null ; return d.postFire(a, mode); }
该方法用于尝试触发下个阶段的执行, 此时上个阶段已执行完成
,或者还未执行完成
.
postFire方法原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 final CompletableFuture<T> postFire (CompletableFuture<?> a, int mode) { if (a != null && a.stack != null ) { if (mode < 0 || a.result == null ) a.cleanStack(); else a.postComplete(); } if (result != null && stack != null ) { if (mode < 0 ) return this ; else postComplete(); } return null ; }
此方法作用就是:
检查是否存在另一个阶段和当前阶段一样, 依赖上阶段完成时触发.
如果当前阶段已完成则触发后续阶段
总结 CompletableFuture的核心就是Completion, 通过Completion就前后阶段的CF连接了起来.