并发编程 14:CompletableFuture异步编程没有那么难

以下文章来源于七哥聊编程 ,作者七哥


大家好,我是七哥,今天给大家分享一个非常强大的工具类:CompletableFuture,如果你平时也会遇到用多线程优化业务逻辑的场景,那么今天这篇文章我建议你读完,相信一定会让你在重构相关代码时得心应手,写出让人称赞的好代码,不过使用CompletableFuture的前提是JDK需要1.8以上哦~

那我们下面进入今天的正文。



项目中串行调用的例子
并行调用的几种实现
CompletableFuture解析
JDK8流程编程结合
总结
前言
在Java开发的web项目中,我们经常会遇到接口响应耗时过长,或者定时任务处理过慢,那在Java中最常见的解决方法就是并行了,想必大家也都不陌生了。

今天的分享主要带大家从一个实际的串行场景出发,如何一步步优化,同时也会分享在Java中实现并行处理的多种方式,以及它们之间的区别和优缺点,通过对比总结更加深入的了解并且使用Java中并发编程的相关技术。

一个串行调用的例子
现在我们有一个查询carrier下所有Load的接口,它需要查询Loads信息、Instruction信息、Stops信息、Actions信息后然后组装数据。

private List<Load> getHydratedLoads(Optional<Pageable> pageable, String predicate, List<Object> params) {
    // 1. 耗时3秒
    List<Load> loads = executeQuery("查询Loads列表");
    // 2. 耗时4秒
    List<Instruction> instructions = executeQuery("查询instructions列表");
    // 3. 耗时2秒
    List<Stop> stops = executeQuery("查询stops列表");
    // 4. 耗时3秒
    List<Action> actions = executeQuery("查询actions列表");
    
    Multimap<String, Instruction> instructionsByLoadId = index(instructions, i -> i.getLoad().getId());
    Multimap<String, Stop> stopsByLoadId = index(stops, s -> s.getLoad().getId());
    Multimap<String, Action> actionsByStopId = index(actions, a -> a.getStop().getId());
    
    // 数据处理
    handle(loads,instructions,stops,actions);
    return loads;
}
这段代码会有什么问题?其实算是一段比较正常的代码,但是在某一个carrier下数据量比较大时,sql查询是相对较慢的,那有没有办法优化一下呢?



当前这个请求耗时总计就是12s。上面实现中查询Load、Instruction、Stop、Action 等信息是串行的,那串行的系统要做性能优化很常见的就是利用多线程并行了。


这种相互之间没有影响的任务,利用并行处理后耗时就可以优化为4s。

并行调用实现的几种方式
因为项目中多线程都用线程池,所以Thread.join()这种方式就不演示了。

1. Future+Callable
Future接口在Java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让调用线程能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。

因为我们都是需要获取任务的返回值的,所以大家肯定想到是用 Future+Callable来做。

ThreadPoolExecutor提供了3个submit方法支持我们需要获取任务执行结果的需求。

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);
简单介绍下这三个submit方法:

提交 Runnable 任务 submit(Runnable task),这个方法入参是Runnable接口,它只有一个run()方法没有返回值,所以它返回的 Future 只能用来判断任务是否结束;
提交 Callable 任务 submit(Callable task),它的入参是Callable接口,只有一个call()方法,是有返回值的,所以可以获取任务执行结果;
提交 Runnable 任务及结果引用 submit(Runnable task, T result),这个方法返回的Future,调用get()方法的返回值就是传入的result对象,一般用法就是实现Runnable接口时,声明一个有参构造函数,将result传进去,result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
这三个方法的返回值都是Future接口,Future 提供了5个方法:



分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。

需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

2. FutureTask实现并行调用
我们再介绍下FutureTask工具类,这是一个实实在在的工具类,有两个构造函数,和上面类似,一看就明白了。

FutureTask(Callable callable);
FutureTask(Runnable runnable, V result);


public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}


public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

private static final class RunnableAdapter<T> implements Callable<T> {
    private final Runnable task;
    private final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
    public String toString() {
        return super.toString() + "[Wrapped task = " + task + "]";
    }
}

这个类实现了 Runnable 和 Future 接口,可以理解就是将任务和结果结合起来了,变成一个可以有响应结果的任务进行提交,本质上FutureTask里面封装的还是一个Callable接口,它实现可以有返回值就是因为它的run方法里面调用了Callable的call()方法,将结果赋值给result,然后返回。

下面我们看下如何优化我们上面的查询接口,实现并行查询:

private List<Load> getHydratedLoadsUsingFutureTask() throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    
    FutureTask<List<Load>>  queryLoadFutureTask = new FutureTask<>(() -> executeQuery("sql1"));
    executorService.submit(queryLoadFutureTask);
    
    FutureTask<List<Instruction>>  queryInstructionFutureTask = new FutureTask<>(() -> executeQuery("sql2"));
    executorService.submit(queryInstructionFutureTask);
    
    FutureTask<List<Stop>> queryStopFutureTask = new FutureTask<>(() -> executeQuery("sql3"));
    executorService.submit(queryStopFutureTask);
    
    FutureTask<List<Action>>  queryActionFutureTask = new FutureTask<>(() -> executeQuery("sql4"));
    executorService.submit(queryActionFutureTask);
    
    // 获取结果
    List<Load> loads = queryLoadFutureTask.get();
    List<Instruction> instructions = queryInstructionFutureTask.get();
    List<Stop> stops = queryStopFutureTask.get();
    List<Action> actions = queryActionFutureTask.get();
    // We got all the entities we need, so now let's fill in all of their references to each other.
    handleData(loads, instructions, stops, actions);
    return loads;
}
那你可能会想到,如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,该怎么处理呢?

这种问题基本上也都可以用 Future 来解决,但是需要将对应的 FutureTask传入到当前任务中,然后调用get()方法即可。

比如,我们创建了两个 FutureTask——ft1 和 ft2,ft1 需要等待 ft2 执行完毕后才能做最后的数据处理,所以 ft1 内部需要引用 ft2,并在执行数据处理前,调用 ft2 的 get() 方法实现等待。


// 创建任务T2的FutureTask
FutureTask<String> ft2
  = new FutureTask<>(new T2Task());

// 创建任务T1的FutureTask
FutureTask<String> ft1
  = new FutureTask<>(new T1Task(ft2));

// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();

// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();

// 等待线程T1执行结果
System.out.println(ft1.get());

// T1Task需要执行的任务:
class T1Task implements Callable<String>{
  FutureTask<String> ft2;
  // T1任务需要T2任务的FutureTask
  T1Task(FutureTask<String> ft2){
    this.ft2 = ft2;
  }
  @Override
  String call() throws Exception {
    // 获取T2线程结果  
    String tf = ft2.get();
    return "处理完的数据结果";
  }
}
// T2Task需要执行的任务:
class T2Task implements Callable<String> {
  @Override
  String call() throws Exception {
    return "检验&查询数据";
  }
}
通过这上面的的例子,我们明显的发现 Future 实现异步编程时的一些不足之处:

Future 对于结果的获取很不方便,只能通过 get() 方法阻塞或者轮询的方式得到任务的结果。阻塞的方式显然是效率低下的,轮询的方式又十分耗费CPU资源,如果前一个任务执行比较耗时的话,get() 方法会阻塞,形成排队等待的情况。
将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
等待Future集合中的所有任务都完成。
仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
我们很难表述Future结果之间的依赖性,从文字描述上这很简单。比如,下面文字描述的关系,如果用Future去实现时还是很复杂的。

比如:“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”

在JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture时传入回调对象,任务在完成或者异常时,自动回调,再也不需要每次主动通过 Future 去询问结果了,我们接着往下看。






3. CompletableFuture
Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程,CompletableFuture 类实现了CompletionStage 和 Future 接口,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过 完成时回调 的方式处理计算结果,并且提供了 转换和组合 CompletableFuture 的方法。

Callable,有结果的同步行为,比如做饭,就能产出一盘菜;
Runnable,无结果的同步行为,比如吃饭,仅仅就是吃就完事了;
Future,异步封装 Callable/Runnable,比如委托给你媳妇去做饭(其他线程);
CompletableFuture,封装Future,使其拥有回调功能,比如让你媳妇做好饭了,主动告诉你做好了;
为了体会到 CompletableFuture 异步编程的优势,我们还是先用 CompletableFuture 重新实现前面的程序。

public static List<Load> getHydratedLoadsUsingCompletableFuture()
    throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    try {
        // 任务1:查询loads列表
        CompletableFuture<List<Load>> queryLoads = CompletableFuture.supplyAsync(() -> executeQuery("sql1"), executorService);
        // 任务2:查询instructions列表
        CompletableFuture<List<Instruction>> queryInstructions = CompletableFuture.supplyAsync(() -> executeQuery("sql2"),
            executorService);
        // 任务3:查询stops列表
        CompletableFuture<List<Stop>> queryStops = CompletableFuture.supplyAsync(() -> executeQuery("sql3"), executorService);
        // 任务4:查询actions列表
        CompletableFuture<List<Action>> queryActions = CompletableFuture.supplyAsync(() -> executeQuery("sql4"),
            executorService);
        // 任务1,2,3,4执行完成后执行数据组装
        CompletableFuture<Void> combineFuture = CompletableFuture.allOf(queryLoads,
                queryInstructions,
                queryStops,
                queryActions)
            .thenRun(() -> handleData(queryLoads.join(), queryInstructions.join(), queryStops.join(), queryActions.join()));
        System.out.println(Thread.currentThread().getName() + ": 主线程执行到这里了");
        combineFuture.get();
        
        System.out.println(String.format("""
            queryLoads: %s ,queryInstructions: %s ,queryStops: %s ,queryActions: %s
            """, queryLoads.isDone(), queryInstructions.isDone(), queryStops.isDone(), queryActions.isDone()));
        
        return queryLoads.get();
    } finally {
        executorService.shutdown();
    }
}
通过上面的代码我们可以发现 CompletableFuture 有以下优势:

无需手工维护线程,省去了手工提交任务到线程池这一步;
语义更清晰,例如 CompletableFuture.allOf(f1,f2,f3,f4)  能够清晰地表述“需要等指定的4个任务都完成才能执行后续的任务”;
代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
CompletableFuture 解析
1. CompletableFuture创建
CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
这四个方法区别在于:

runAsync 方法以 Runnable 函数式接口类型为参数,没有返回结果,supplyAsync 方法以 Supplier 函数式接口类型为参数,返回结果类型为U;
没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定了线程池,则使用指定的线程池运行。
ForkJoinPool是JDK7提供的,叫做分支/合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法,极大的提高效率,这个不属于今天我们讨论的点,感兴趣的话可以后面再聊。

注意:如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

问题:为什么supplyAsync方法接收一个 Supplier 函数式接口类型参数而不是一个 Callable 类型的参数呢?

@FunctionalInterface
public interface Callable<V> {
/**
 * Computes a result, or throws an exception if unable to do so.
 *
 * @return computed result
 * @throws Exception if unable to compute a result
 */
V call() throws Exception;
}

@FunctionalInterface
public interface Supplier<T> {

/**
 * Gets a result.
 *
 * @return a result
 */
T get();
}
看了接口定义,我们发现它们其实都是一个不接受任何参数类型的函数式接口,在实践中它们做的是相同的事情(定义一个业务逻辑去处理然后有返回值),但在原则上它们的目的是做不同的事情:

从语义上来看 Callable 是“返回结果的任务”,而 Supplier 是 “结果的供应商”。可以理解为 Callable 引用了一个未执行的工作单元,Supplier 引用了一个未知的值。侧重点可能不一样,如果关心的是提供一个什么值而不关心具体做了啥工作使用 Supplier 感觉更合适。例如,ExecutorService 与 Callable一起工作,因为它的主要目的是执行工作单元。CompletableFuture 使用 Supplier,因为它只关心提供的值,而不太关心可能需要做多少工作。
两个接口定义之间的一个基本区别是,Callable允许从其实现中抛出检查异常,而Supplier不允许。
2. 理解CompletionStage接口


通过接口的继承关系,我们可以发现这里的异步操作到底什么时候结束、结果如何获取,都可以通过 Future接口来解决。

另外 CompletableFuture 类还实现了 CompletionStage 接口,这个接口就比较关键了,之所以能实现响应式编程,都是通过这个接口提供的方法。

下面介绍下 CompletionStage 接口,看字面意思可以理解为“完成动作的一个阶段”,官方注释文档:CompletionStage 是一个可能执行异步计算的“阶段”,这个阶段会在另一个 CompletionStage 完成时调用去执行动作或者计算,一个 CompletionStage 会以正常完成或者中断的形式“完成”,并且它的“完成”会触发其他依赖的 CompletionStage 。CompletionStage 接口的方法一般都返回新的CompletionStage,因此构成了链式的调用。

这个看完还是有点懵逼的,不清楚什么是 CompletionStage?

在Java中什么是 CompletionStage ?

一个Function、Comsumer、Supplier 或者 Runnable 都会被描述为一个CompletionStage。

stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
x -> square(x) 就是一个 Function 类型的 Stage,它返回了x。
x -> System.out.println(x) 就是一个 Comsumer 类型的Stage,用于接收上一个Stage的结果x。
() ->System.out.println() 就是一个Runnable类型的Stage,既不消耗结果也不产生结果。
但是 CompletionStage 这里面一共有40多个方法,我们该如何理解呢?

CompletionStage 接口可以清晰的描述任务之间的关系,可以分为 顺序串行、并行、汇聚关系以及异常处理。

串行关系
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。


public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
thenApply 系列方法里参数 fn 的类型是接口 Function,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的 CompletionStage 对象,这个方法既能接收参数也支持返回值,可以理解为对于结果的转换;
thenAccept 系列方法里参数 action 的类型是接口 Consumer,这个方法虽然支持参数,但却不支持回值,可以理解为对于结果的消费;
thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,也是对于结果的一种消费,和 thenAccept 区别在于 Runnable 并不使用前一步 CompletableFuture 计算的结果;
thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果,和 thenApply 执行结果类似,区别在于会生成一个新的 CompletableFuture 返回,也可以理解为对于结果的转换;
thenApply() 和 thenCompose() 的区别?thenApply 转换的是泛型中的类型,是同一个CompletableFuture,thenCompose 用来连接两个CompletableFuture,是生成一个新的 CompletableFuture。他们都是让 CompletableFuture 可以对返回的结果进行后续操作,就像 Stream 一样进行 map 和 flatMap 的转换。

public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

    CompletableFuture<String> result1 = future.thenApply(param -> param + " World");
    CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));

    System.out.println(result1.get());
    System.out.println(result2.get());
}
这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。


CompletableFuture<String> f0 =
  CompletableFuture.supplyAsync(
    () -> "Hello World")      //①
  .thenApply(s -> s + " QQ")  //②
  .thenApply(String::toUpperCase);//③

System.out.println(f0.join());
//输出结果
HELLO WORLD QQ
可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

CompletableFuture 中 thenApply 如何实现?

先看下静态创建CompletableFuture的方法 supplyAsync;
//静态方法,如果没有传入线程池,使用ForkJoinPool的common线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(ASYNC_POOL, supplier);
}

static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                 Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    //新建CompletableFuture对象
    CompletableFuture<U> d = new CompletableFuture<U>();
    //构造AsyncSupply对象,线程池提交AsyncSupply任务
    e.execute(new AsyncSupply<U>(d, f));
    //将CompletableFuture对象返回
    return d;
}

static final class AsyncSupply<T> extends ForkJoinTask<Void>
    //可以看到AsyncSupply是一个Runnable对象
    implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep; Supplier<? extends T> fn;
    AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
        this.dep = dep; this.fn = fn;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return false; }
    public void run() {
        CompletableFuture<T> d; Supplier<? extends T> f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            //CompletableFuture对象的result为空时
            if (d.result == null) {
                try {
                    //调用传入的supplier的get方法,并将结果放入result字段
                    //注意:这是在线程池中提交的,所以是异步处理的
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    d.completeThrowable(ex);
                }
            }
            //处理完当前方法后,处理依赖它的栈顶方法,后面的回调方法入栈和这块呼应
            d.postComplete();
        }
    }
}

final void postComplete() {
    // 变量f存储的是当前已经完成的CompletableFuture
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        // CAS操作,将依赖此阶段的栈顶元素取出,并且设置为下一个
        if (STACK.compareAndSet(f, h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    //如果f不是this,将刚出栈的h入this的栈顶
                    pushStack(h);
                    continue;
                }
                // 将h剥离出来,h.next=null,帮助gc
                NEXT.compareAndSet(h, t, null); // try to detach
            }
            //调用tryFire
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}
再看下异步处理完 supplyAsync 后的回调方法 thenApply 方法,看看它是如何实现回调的;
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

private <V> CompletableFuture<V> uniApplyStage(
        Executor e, Function<? super T,? extends V> f) {
        if (f == null) throw new NullPointerException();
        Object r;
        // 如果当前阶段结果已经返回,则直接运行回调方法
        if ((r = result) != null)
            return uniApplyNow(r, e, f);
        CompletableFuture<V> d = newIncompleteFuture();
        // 构造Completion放入等待栈的顶
        unipush(new UniApply<T,V>(e, d, this, f));
        return d;
}

private <V> CompletableFuture<V> uniApplyNow(
    Object r, Executor e, Function<? super T,? extends V> f) {
    Throwable x;
    CompletableFuture<V> d = newIncompleteFuture();
    // 如果依赖的方法异常中断,则直接处理并返回异常
    if (r instanceof AltResult) {
        if ((x = ((AltResult)r).ex) != null) {
            d.result = encodeThrowable(x, r);
            return d;
        }
        r = null;
    }
    try {
        // 执行到这里说明依赖的任务已经有结果了,用它的结果当作参数调用回调方法
        // 注意这里都是线程池中的线程在执行,所以是异步执行
        if (e != null) {
            e.execute(new UniApply<T,V>(null, d, this, f));
        } else {
            @SuppressWarnings("unchecked") T t = (T) r;
            d.result = d.encodeValue(f.apply(t));
        }
    } catch (Throwable ex) {
        d.result = encodeThrowable(ex);
    }
    return d;
}

final void unipush(Completion c) {
    if (c != null) {
        // CAS自旋将回调方法压入栈顶
        while (!tryPushStack(c)) {
            if (result != null) {
                NEXT.set(c, null);
                break;
            }
        }
        // 可能在重试中完成,判断result不为空就执行
        if (result != null)
            c.tryFire(SYNC);
    }
}

//再次尝试判断依赖方法是否处理完成,处理完成则调用目标回调方法
final CompletableFuture<V> tryFire(int mode) {
    CompletableFuture<V> d; CompletableFuture<T> a;
    Object r; Throwable x; Function<? super T,? extends V> f;
    if ((a = src) == null || (r = a.result) == null
        || (d = dep) == null || (f = fn) == null)
        return null;
    tryComplete: if (d.result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                d.completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        try {
            if (mode <= 0 && !claim())
                return null;
            else {
                @SuppressWarnings("unchecked") T t = (T) r;
                d.completeValue(f.apply(t));
            }
        } catch (Throwable ex) {
            d.completeThrowable(ex);
        }
    }
    src = null; dep = null; fn = null;
    //成功处理完依赖方法和回调方法后进行处理,可能唤醒其他的回调方法或者清理栈
    return d.postFire(a, mode);
}






描述 AND 汇聚关系
CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别是源自 fn、consumer、action 这三个核心参数不同。


public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
Async后缀的方法表示,前面的 CompletionStage 执行完成,在执行后续操作时会提交到线程池处理,否则就还是使用同一个处理线程完成CompletableFuture的所有任务。

这三种方法意思都是等两个 CompletionStage 都完成了计算才会执行下一步的操作,区别在于参数接口类型不一样。

thenCombine 参数接口类型为 BiFunction,可以拿到前一步两个 CompletionStage 的运算结果,进行下一步处理,同时有返回值(转化操作);
thenAcceptBoth 参数接口类型为 BiConsumer,也可以拿到前一步的运算结果进行下一步处理,但是无返回值(消费操作);
runAfterBoth 参数接口类型为 Runnable,即不能获取到上一步的执行结果,也无返回值(不关心运行结果);
CompletableFuture 中 thenAcceptBoth 如何实现?talk is cheap!!

public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action) {
    return biAcceptStage(null, other, action);
}

private <U> CompletableFuture<Void> biAcceptStage(
    Executor e, CompletionStage<U> o,
    BiConsumer<? super T,? super U> f) {
    CompletableFuture<U> b; Object r, s;
    if (f == null || (b = o.toCompletableFuture()) == null)
        throw new NullPointerException();
    CompletableFuture<Void> d = newIncompleteFuture();
    // 如果两个阶段有任何一个没有执行完成,则将回调方法分别放到两个互相依赖阶段的栈顶
    if ((r = result) == null || (s = b.result) == null)
        bipush(b, new BiAccept<T,U>(e, d, this, b, f));
    else if (e == null)
        // 如果两个依赖的阶段都执行完成则调用回调方法
        d.biAccept(r, s, f, null);
    else
        try {
            e.execute(new BiAccept<T,U>(null, d, this, b, f));
        } catch (Throwable ex) {
            d.result = encodeThrowable(ex);
        }
    return d;
}
描述 OR 汇聚关系
OR的关系,表示谁运行快就用谁的结果执行下一步操作。


public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
同样也是有Async后缀的表示,当前面的 CompletionStage 执行完成,在执行后续操作时会提交到线程池处理。applyToEither、acceptEither、runAfterEither 三个方法的区别还是来自于不同的接口参数类型:Function、Consumer、Runnable。

CompletableFuture 中 applyToEither 如何实现?

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(defaultExecutor(), other, fn);
}

private <U extends T,V> CompletableFuture<V> orApplyStage(
    Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) {
    CompletableFuture<U> b;
    if (f == null || (b = o.toCompletableFuture()) == null)
        throw new NullPointerException();
    Object r; CompletableFuture<? extends T> z;
    // 这块是重点,有任何一个阶段的结果不为空就直接执行function
    if ((r = (z = this).result) != null ||
        (r = (z = b).result) != null)
        return z.uniApplyNow(r, e, f);
    CompletableFuture<V> d = newIncompleteFuture();
    // 如果都为空则将回调方法分别push到被依赖的两个阶段的栈顶
    orpush(b, new OrApply<T,U,V>(e, d, this, b, f));
    return d;
}
异常处理
在Java编程中,异常处理当然是必不可少的一环,那你可能会想到如果在使用 CompletableFuture 进行异步链式编程时,如果出现异常该怎么处理呢?

首先上面我们提到的 fn、consumer、action 它们的核心方法是不允许抛出可检查异常的,但是却无法限制它们抛出运行时异常。在同步方法中,我们可以使用 try-catch{} 来捕获并处理异常,但在异步编程里面异常该如何处理 ?CompletionStage 接口给我们提供的方案非常简单,比 try-catch{} 还要简单。

下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
参数的类型是 BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况,可以获取到上一步的执行结果作为参数;
无论是否发生异常都会执行 whenComplete() 中的回调函数 action;
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行;
这几个方法都会返回 CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture 的计算结果或者返回异常。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }
    if (new Random().nextInt() % 2 == 0) {
        int i = 12 / 0;
    }
    System.out.println("执行结束!");
});
future.whenComplete(new BiConsumer<Void, Throwable>() {
    @Override
    public void accept(Void t, Throwable action) {
        System.out.println("执行完成!");
    }
});
future.exceptionally(new Function<Throwable, Void>() {
    @Override
    public Void apply(Throwable t) {
        System.out.println("执行失败:" + t.getMessage());
        return null;
    }
}).join();
handle 也是执行任务完成时对结果的处理,whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

当上一个的 CompletableFuture 的值计算完成或者抛出异常的时候,会触发 handle 方法中定义的函数,结果由 BiFunction 参数计算而得,因此这组方法兼有 whenComplete 和转换的两个功能。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
JDK8流式编程结合
public static List<String> exampleCompletableFutureAndStream() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    List<String> loads = null;
    try {
        // 所有需要查询远程服务的load列表
        List<String> requestList = Lists.newArrayList("load1", "load2", "load3", "load4");
        List<CompletableFuture<String>> completableFutures = requestList.stream()
            // 使用CompletableFuture以异步方式查询数据
            .map(req -> CompletableFuture.supplyAsync(() -> invokeReq(req), executorService))
            .map(future -> future.thenApply(Load::getStatus))
            .map(future -> future.thenCompose(status -> CompletableFuture.supplyAsync(() -> status.name().toUpperCase())))
            .toList();
        loads = completableFutures.stream().map(CompletableFuture::join).toList();
        System.out.println(Thread.currentThread().getName() + ": CompletableFuture异步方式查询请求已完成:" + loads.size());
        
    } finally {
        executorService.shutdown();
    }
    return loads;
}
注意到了吗?这里使用了两个不同的Stream流水线,是否可以在同一个处理流的流水线上一个接一个地放置多个map操作。

public static List<String> exampleCompletableFutureAndStream() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    List<String> loads = null;
    try {
        // 所有需要查询远程服务的load列表
        List<String> requestList = Lists.newArrayList("load1", "load2", "load3", "load4");
        
        loads = requestList.stream()
            // 使用CompletableFuture以异步方式查询数据
            .map(req -> CompletableFuture.supplyAsync(() -> invokeReq(req), executorService))
            .map(future -> future.thenApply(Load::getStatus))
            .map(future -> future.thenCompose(status -> CompletableFuture.supplyAsync(() -> status.name().toUpperCase())))
            .map(CompletableFuture::join)
            .toList();
        
        System.out.println(Thread.currentThread().getName() + ": CompletableFuture异步方式查询请求已完成:" + loads.size());
    } finally {
        executorService.shutdown();
    }
    return loads;
}
这其实是有原因的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,不同的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定服务请求的动作、通知join方法返回结果。

再来看一个例子:

我们的系统提供的运费价格是以美元计价的,但是你希望以人民币(RMB)的方式提供给你的客户。你可以用异步的方式向计费中心查询指定Load的价格,同时从远程的汇率服务那里查到人民币和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以人民币计价的商品价格。

public class MultiThreadTest {

    @Test
    public void test18() {
        long start = System.nanoTime();
        List<CompletableFuture<Double>> futures = loads.stream()
                .map(laod ->
                        CompletableFuture
                                // 查商品价格操作和查兑换汇率操作同时进行,当两者都完成时将结果进行整合
                                .supplyAsync(() -> load.getPrice("load1"))
                                .thenCombine(CompletableFuture.supplyAsync(() -> RateService.getRate("RMB", "USD")), (price, rate) -> price * rate)
                )
                .collect(toList());
        List<Double> usdPrices = futures.stream()
                .map(CompletableFuture::join)
                .collect(toList());

    }

}
通过上述例子,可以看到相对于采用Java 8之前提供的Future实现,CompletableFuture版本实现所具备的巨大优势。CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。

为了更直观地感受一下使用CompletableFuture在代码可读性上带来的巨大提升,下面尝试仅使用Java 7中提供的特性,重新实现上述例子的功能。

public class MultiThreadTest {

    @Test
    public void test19() throws ExecutionException, InterruptedException {
        long start = System.nanoTime();
        List<Future<Double>> usdFuturePrices = new ArrayList<>(shops.size());
        for (Shop shop : shops) {
            // 创建一个查询人民币到美元转换汇率的Future
            final Future<Double> usdFutureRate = executor.submit(new Callable<Double>() {
                public Double call() {
                    return RateService.getRate("RMB", "USD");
                }
            });
            // 在第二个Future中查询指定商店中特定商品的价格
            Future<Double> usdFuturePrice = executor.submit(new Callable<Double>() {
                public Double call() throws ExecutionException, InterruptedException {
                    double rmbPrice = shop.getPrice("肥皂");
                    // 在查找价格操作的同一个Future中, 将价格和汇率做乘法计算出汇后价格
                    return rmbPrice * usdFutureRate.get();
                }
            });
            usdFuturePrices.add(usdFuturePrice);
        }
        List<Double> usdPrices = new ArrayList<>(usdFuturePrices.size());
        for (Future<Double> usdFuturePrice : usdFuturePrices) {
            usdPrices.add(usdFuturePrice.get());
        }

    }

}
这里我们思考这样一个问题:并行使用流还是CompletableFuture?

对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在 CompletableFuture 内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。同时也可以提供更多描述任务之间关系的接口,我们不需要为之编写更多的代码。

这里对使用这些API的建议如下:

如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好。
总结
今天大家学到了哪些知识呢?

如何优化接口性能?某些场景下可以使用多线程并行代替串行。
如何实现接口并行调用?通过今天的学习可以使用 Future+Callable、FutureTask、CompletableFuture。
详细介绍了CompletableFuture的强大,掌握CompletableFuture提供的函数式编程的能力,以及与JDK8流式编程结合使用,使代码更加美观优雅,写起来简洁和便利;
在接口设计时可以参考CompletableFuture的实现,将两个无关的接口能力组装在一起以实现更加强大的功能;
不足之处:今天只是对于并发编程中的工具类使用和相关原理做了分享,在实际开发过程中可能需要考虑到更多的通用性,封装通过调用模版方法,不要每一个地方都写一堆类似的代码。

通过今天的分享,希望大家可以在平时开发工作中遇到合适的场景时尝试使用 CompletableFuture 提供的API,优化程序性能、提高开发效率。



作者:七哥

公众号:牧小农,微信扫码关注或搜索公众号名称