CompletableFuture使用详解

02-27 1557阅读

一、简介

1.1 概述

在上一篇文章《CompletionService使用与源码分析》中,已经介绍过了Future的局限性,它没法直接对多个任务进行链式、组合等处理,需要借助并发工具类才能完成,实现逻辑比较复杂。

而CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

CompletableFuture的继承结构如下:

CompletableFuture使用详解

CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。

CompletableFuture中默认线程池如下:

// 根据commonPool的并行度来选择,而并行度的计算是在ForkJoinPool的静态代码段完成的
private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ForkJoinPool中初始化commonPool的参数

static {
    // initialize field offsets for CAS etc
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class k = ForkJoinPool.class;
        CTL = U.objectFieldOffset
            (k.getDeclaredField("ctl"));
        RUNSTATE = U.objectFieldOffset
            (k.getDeclaredField("runState"));
        STEALCOUNTER = U.objectFieldOffset
            (k.getDeclaredField("stealCounter"));
        Class tk = Thread.class;
        ……
    } catch (Exception e) {
        throw new Error(e);
    }
    commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
    defaultForkJoinWorkerThreadFactory =
        new DefaultForkJoinWorkerThreadFactory();
    modifyThreadPermission = new RuntimePermission("modifyThread");
    // 调用makeCommonPool方法创建commonPool,其中并行度为逻辑核数-1
    common = java.security.AccessController.doPrivileged
        (new java.security.PrivilegedAction() {
            public ForkJoinPool run() { return makeCommonPool(); }});
    int par = common.config & SMASK; // report 1 even if threads disabled
    commonParallelism = par > 0 ? par : 1;
}

1.2 功能

1.2.1 常用方法
依赖关系
  • thenApply():把前面任务的执行结果,交给后面的Function
  • thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回
    and集合关系
    • thenCombine():合并任务,有返回值
    • thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值
    • runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务)
      or聚合关系
      • applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值
      • acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值
      • runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)
        并行执行
        • allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
        • anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture
          结果处理
          • whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作
          • exceptionally:返回一个新的CompletableFuture,当前面的CompletableFuture完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture的完成

            1.2.2 异步操作

            CompletableFuture提供了四个静态方法来创建一个异步操作:

            public static CompletableFuture runAsync(Runnable runnable)
            public static CompletableFuture runAsync(Runnable runnable, Executor executor)
            public static  CompletableFuture supplyAsync(Supplier supplier)
            public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)
            

            这四个方法的区别:

            • runAsync() 以Runnable函数式接口类型为参数,没有返回结果,supplyAsync() 以Supplier函数式接口类型为参数,返回结果类型为U;Supplier接口的 get()是有返回值的(会阻塞)
            • 使用没有指定Executor的方法时,内部使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
            • 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰
              异步操作
              Runnable runnable = () -> System.out.println("无返回结果异步任务");
              CompletableFuture.runAsync(runnable);
              CompletableFuture future = CompletableFuture.supplyAsync(() -> {
                  System.out.println("有返回值的异步任务");
                  try {
                      Thread.sleep(5000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return "Hello World";
              });
              String result = future.get();
              
              获取结果(join&get)

              join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)

              结果处理

              当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:

              public CompletableFuture whenComplete(BiConsumer other,Runnable action);
              public CompletionStage runAfterEitherAsync(CompletionStage other,Runnable action);
              

              具体使用:

              CompletableFuture future1 = CompletableFuture.supplyAsync(new Supplier() {
                  @Override
                  public Integer get() {
                      int number = new Random().nextInt(5);
                      try {
                          TimeUnit.SECONDS.sleep(number);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println("任务1结果:" + number);
                      return number;
                  }
              });
              CompletableFuture future2 = CompletableFuture.supplyAsync(new Supplier() {
                  @Override
                  public Integer get() {
                      int number = new Random().nextInt(5);
                      try {
                          TimeUnit.SECONDS.sleep(number);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println("任务2结果:" + number);
                      return number;
                  }
              });
              future1.runAfterEither(future2, new Runnable() {
                  @Override
                  public void run() {
                      System.out.println("已经有一个任务完成了");
                  }
              }).join();
              
              anyOf

              anyOf() 的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。

              常用方法:

              public static CompletableFuture anyOf(CompletableFuture... cfs)
              

              具体使用:

              Random random = new Random();
              CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
                  try {
                      TimeUnit.SECONDS.sleep(random.nextInt(5));
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return "hello";
              });
              CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
                  try {
                      TimeUnit.SECONDS.sleep(random.nextInt(1));
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return "world";
              });
              CompletableFuture result = CompletableFuture.anyOf(future1, future2);
              
              allOf

              allOf方法用来实现多 CompletableFuture 的同时返回。

              常用方法:

              public static CompletableFuture allOf(CompletableFuture... cfs)
              

              具体使用:

              CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
                  try {
                      TimeUnit.SECONDS.sleep(2);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println("future1完成!");
                  return "future1完成!";
              });
              CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
                  System.out.println("future2完成!");
                  return "future2完成!";
              });
              CompletableFuture combindFuture = CompletableFuture.allOf(future1, future2);
              try {
                  combindFuture.get();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } catch (ExecutionException e) {
                  e.printStackTrace();
              }
              

              CompletableFuture常用方法总结:

              CompletableFuture使用详解

              注:CompletableFuture中还有很多功能丰富的方法,这里就不一一列举。

              三、使用案例

              实现最优的“烧水泡茶”程序

              著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:

              CompletableFuture使用详解

              对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。

              基于Future实现
              public class FutureTaskTest{
                  public static void main(String[] args) throws ExecutionException, InterruptedException {
                      // 创建任务T2的FutureTask
                      FutureTask ft2 = new FutureTask(new T2Task());
                      // 创建任务T1的FutureTask
                      FutureTask ft1 = new FutureTask(new T1Task(ft2));
                      // 线程T1执行任务ft2
                      Thread T1 = new Thread(ft2);
                      T1.start();
                      // 线程T2执行任务ft1
                      Thread T2 = new Thread(ft1);
                      T2.start();
                      // 等待线程T1执行结果
                      System.out.println(ft1.get());
                  }
              }
              // T1Task需要执行的任务:
              // 洗水壶、烧开水、泡茶
              class T1Task implements Callable {
                  FutureTask ft2;
                  // T1任务需要T2任务的FutureTask
                  T1Task(FutureTask ft2){
                      this.ft2 = ft2;
                  }
                  @Override
                  public String call() throws Exception {
                      System.out.println("T1:洗水壶...");
                      TimeUnit.SECONDS.sleep(1);
                      System.out.println("T1:烧开水...");
                      TimeUnit.SECONDS.sleep(15);
                      // 获取T2线程的茶叶
                      String tf = ft2.get();
                      System.out.println("T1:拿到茶叶:"+tf);
                      System.out.println("T1:泡茶...");
                      return "上茶:" + tf;
                  }
              }
              // T2Task需要执行的任务:
              // 洗茶壶、洗茶杯、拿茶叶
              class T2Task implements Callable {
                  @Override
                  public String call() throws Exception {
                      System.out.println("T2:洗茶壶...");
                      TimeUnit.SECONDS.sleep(1);
                      System.out.println("T2:洗茶杯...");
                      TimeUnit.SECONDS.sleep(2);
                      System.out.println("T2:拿茶叶...");
                      TimeUnit.SECONDS.sleep(1);
                      return "龙井";
                  }
              }
              
              基于CompletableFuture实现
              public class CompletableFutureTest {
                  public static void main(String[] args) {
                      //任务1:洗水壶->烧开水
                      CompletableFuture f1 = CompletableFuture
                          .runAsync(() -> {
                              System.out.println("T1:洗水壶...");
                              sleep(1, TimeUnit.SECONDS);
                              System.out.println("T1:烧开水...");
                              sleep(15, TimeUnit.SECONDS);
                          });
                      //任务2:洗茶壶->洗茶杯->拿茶叶
                      CompletableFuture f2 = CompletableFuture
                          .supplyAsync(() -> {
                              System.out.println("T2:洗茶壶...");
                              sleep(1, TimeUnit.SECONDS);
                              System.out.println("T2:洗茶杯...");
                              sleep(2, TimeUnit.SECONDS);
                              System.out.println("T2:拿茶叶...");
                              sleep(1, TimeUnit.SECONDS);
                              return "龙井";
                          });
                      //任务3:任务1和任务2完成后执行:泡茶
                      CompletableFuture f3 = f1.thenCombine(f2, (__, tf) -> {
                          System.out.println("T1:拿到茶叶:" + tf);
                          System.out.println("T1:泡茶...");
                          return "上茶:" + tf;
                      });
                      //等待任务3执行结果
                      System.out.println(f3.join());
                  }
                  static void sleep(int t, TimeUnit u){
                      try {
                          u.sleep(t);
                      } catch (InterruptedException e) {
                      }
                  }
              }
              
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]