Java 8 的 CompletableFuture
创建
- 创建一个已完成的
- 类似 guava 的 SettableFuture, 先创建一个空的,再在合适的地方完成它
- 异步计算或运行
private static void create() {
// 新建一个已经完成的
CompletableFuture.completedFuture("");
// JDK 9+
// CompletableFuture.failedFuture(new Exception());
// 类似 SettableFuture.create() 新建一个壳子
CompletableFuture<String> hello = new CompletableFuture<>();
// 然后在合适的时机使它完成
hello.complete("");
hello.completeExceptionally(new Exception());
// 异步运行
CompletableFuture.runAsync(() -> System.out.println("CompletableFuture<Void>"));
// 异步计算
CompletableFuture.supplyAsync(() -> "CompletableFuture<String>");
}
常用操作
- thenApply 完成后对类型 T 的结果执行转换为类型 U 的操作,返回新的 Future<U>
- thenAccept 完成后对类型 T 的结果执行消费操作,返回新的 Future<Void>
- thenAcceptBoth 本类型 T 和另一个类型 U 都完成后,执行消费操作,返回 Future<Void>
- thenCombine 本类型 T 联合另一个 U 都完成后,执行一个 BiFunction<T,U,V>,返回新的 Future<V>
- thenCompose 完成后的结果 T 转换为另一个 Future<U>, 类似 flatMap
- exceptionally 异常时返回兜底结果
- whenComplete 当完成或异常时的处理,返回 Future<Void>
- handle 类似 thenApply, 返回新的 Future<U>
- 各种 Async 后缀的版本:异步执行,传入线程池。不传 Executor 则默认为 ForkJoinPool.commonPool
private static void then() {
CompletableFuture.completedFuture("")
// 完成后执行一个 Function 进行数据转换 类似 Stream.map
.thenApply(String::length)
// 联合另一个 CompletableFuture,都完成后执行一个 BiFunction 进行数据转换
.thenCombine(CompletableFuture.completedFuture(1), (length, num) -> "len+num=" + length + num)
// 当两个都完成后执行一个动作 返回新的 CompletableFuture<Void>
.thenAcceptBoth(CompletableFuture.completedFuture(2), (s, num) -> System.out.printf("%s,%d\n", s, num))
.thenApply(v -> "s")
// 将完成后的结果转换为另一个 CompletableFuture 类似 Stream.flatMap
.thenCompose(s -> CompletableFuture.completedFuture("compose:" + s))
// 当出现异常时 处理异常并返回异常时的值
.exceptionally(e -> "之前是什么类型这里就需要返回什么类型")
// 完成后执行的动作 返回新的 CompletableFuture<Void>
.thenAccept(System.out::println)
// 类似 Accept 完成后执行的动作,返回新的 CompletableFuture<Void>
.whenComplete((result, ex) -> {
})
// 类似 Apply 完成后执行的动作,返回新的 CompletableFuture<U>
.handle((result, ex) -> "new")
;
}
后续的操作在哪个线程执行?
该 CompletableFuture 在哪个线程完成的,它之后紧接着的 then 操作就在这个线程运行。
private static void thread() {
CompletableFuture<String> f = new CompletableFuture<>();
CompletableFuture<Long> other = new CompletableFuture<>();
System.out.println("out: " + Thread.currentThread().getName());
f
.thenApply(s -> {
// 这里的线程是给 f 设置结果的线程
System.out.println("thenApply: " + Thread.currentThread().getName());
return s;
})
.thenApplyAsync(s -> {
// Async 结尾的方法不传 线程池则是 ForkJoinPoll.commonPoll
System.out.println(Thread.currentThread().getName());
return s;
})
.thenAccept(s -> {
// 和上个操作是同一个线程
System.out.println(s + " " + Thread.currentThread().getName());
})
.thenCompose(v -> other)
.thenApply(num -> {
// 执行线程 取决于哪个 future 后完成
System.out.println(num + " .thenCompose.thenApply: " + Thread.currentThread().getName());
return num + 1;
})
;
new Thread(() -> {
// sleep(50);
f.complete("Ha");
}, "MyThread-1").start();
new Thread(() -> {
sleep(50);
other.complete(1L);
}, "MyThread-2").start();
}
private static void sleep(long mills) {
try {
Thread.sleep(mills);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
内部原理
如果是已完成的 Future, 调用各种 then 方法,就直接执行了;如果还没完成,则需要把各种 Function/Consumer 等需要执行的回调动作保存起来,待完成后再执行。
private static void debug() {
// 实际开发一般都是一条链走到底,这里为了 Debug 好对比哪个实例是哪个 故分开写变量
CompletableFuture<String> hello = new CompletableFuture<>();
CompletableFuture<Void> print = hello.thenAccept(System.out::println);
CompletableFuture<String> upper = hello.thenApply(String::toUpperCase);
CompletableFuture<Void> v1 = upper.thenAccept(System.out::println);
CompletableFuture<Void> v2 = print.thenCombine(upper, (aVoid, s) -> s.toCharArray())
.thenCompose(chars -> CompletableFuture.completedFuture(chars.length))
.thenAccept(System.out::println);
hello.complete("Hello");
}

参考链接:
声明
- 本作品采用署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。除非特别注明, 霖博客文章均为原创。
- 转载请保留本文(《CompletableFuture》)链接地址: https://youthlin.com/?p=1738
- 订阅本站:https://youthlin.com/feed/
“CompletableFuture”上的1条回复