Java8的CompletableFuture

    xiaoxiao2021-03-25  114

    Java8的CompletableFuture之一

    841人阅读 评论(0) 收藏 举报 分类: java(24)

    目录(?)[+]

          Java8提供了一种函数风格的异步和事件驱动编程模型CompletableFuture,该模型不同于以往Java版本,不会造成堵塞Blocking。过去,Java 5并发包主要聚焦于异步任务处理,其模型特点是基于一个生产者线程,不断地创造任务,通过一个堵塞Blocking队列传递给任务的消费者,这个模型在Java 7和Java 8以后使用了另外一种任务执行模型,同时将一个任务的数据分解到子集中,这些子集能够分别被同样的子任务独立地处理。这种风格后面的基础库包就是 fork/join框架。

      fork/join框架允许程序员指定一个数据集如何被切分多个子任务,将子任务提交一个标准默认的线程池:通用的ForkJoinPool。在Java 8中fork/join并行还可以通过并行流机制访问获得,但是这种方式的并行处理是有成本的和前提条件的:首先,元素处理必须能够独立进行,其次,数据集必须足够大,每个元素处理的消耗成本需要足够高,因为设置与启动fork/join框架本身也有一定的消耗,这些消耗相对于数据集合中每个元素处理的成本来说可以忽略不计,否则就不是很划算。

      Java 8的CompletableFuture背后也是依靠fork/join框架启动新的线程实现异步与并发的,一般情况下,我们将一个任务放到另外一个线程执行,可能就无需等待那个线程处理完成的结果,而是直接在主线程中返回完成,但是有一些业务恰是需要等待新启动的线程中任务完成,然后和当前主线程中的处理进行合并再处理,比如下面代码我们需要在另外一个线程进行很长时间的运行。

    CompletableFuture futureCount = CompletableFuture . supplyAsync (      () -> {          try {              // Simulate long running task 模拟长时间运行任务              Thread . sleep ( 5000 );          } catch ( InterruptedException e ) { }          return 10 ;      }); //现在可以同时在当前主线程做其他事情,不用等待上面长时间运行任务结束

    CompletableFuture.supplyAsync 允许你基于ForkJoinPool 异步地运行一个任务,同时也有选项供你选择更多对线程池的控制。下面是获得长时间运行任务的返回结果:

    try {      int count = futureCount . get ();      System . out . println ( count ); } catch ( InterruptedException | ExecutionException ex ) {      // Exceptions that occur in future will be thrown here. }

    当对CompletableFuture的实力进行.get()方法调用时,在计算过程中任何exception将被抛出。

    创建和获得CompletableFuture有下面四个方法,主要是supplyAsunc和runAsync两种,后者提供的方法参数必须是线程的Runnable,因为Runnable是不会返回任何结果,所以,如果你需要异步处理并返回结果,应该使用Supplier<U>。

    static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); static CompletableFuture<Void> runAsync(Runnable runnable); static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

    我们需要将长时间任务放入supplyAsync方法体中,传统写法如下:

    final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {     @Override     public String get() {         //...长时间运行...         return "42";     } }, executor);

    使用java8的lambda则是如下:

    final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {     //...长时间运行...     return "42"; }, executor);

    甚至简化如下:

    final CompletableFuture<String> future =     CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);

     

    thenApply用法

      从函数编程角度看,CompletableFuture 其实是一个monad和一个functor,如果你有Javacript编程经验,经常会返回或注册一个异步的callback回调函数,这样,我们就不必一直堵塞等待其处理完成再进行其他处理,这也是一种Future概念,意思是:当结果长时间计算出来以后,在结果上运行这个函数。我们可以运行多个这样的回调性质的函数,这可以使用CompletableFuture的 thenApply()方法:

    <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn); <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

    第一个没有Async的thenApply方法是指在future完成的当前线程应用参数中函数,而后面两个带Async的方法则是在不同线程池异步地应用参数中的函数。

    我们以字符串转换为整数功能为例,如下:

    CompletableFuture<String> f1 = //... CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt); CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);

    或者一句:

    CompletableFuture<Double> f3 =     f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

    这里定义了两个theApply转换功能,第一次是从字符串转换到Integer,然后再到Double。这些转换不是立即执行也不会堵塞,只是简单地记住,只有当f1完成以后才会执行这两个转换,如果一些转换动作很花费时间,你可以使用线程池异步处理。

    thenAccept/thenRun用法

      在长时间计算完成后可以经过上面转换,但是在最后阶段有两个方法:

    CompletableFuture<Void> thenAccept(Consumer<? super T> block); CompletableFuture<Void> thenRun(Runnable action);

      thenAccept()提供了final最后的值,而thenRun执行Runnable就不会返回任何计算好的值或结果了。

    future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor); log.debug("Continuing");

      这两个方法是不会堵塞的,即使没有指定executor,可以将它们看成是对未来结果的监听者。

     

    thenCompose()用法

      CompletableFuture的异步处理非常不错,有时,你需要在一个future值结构运行某个函数,但是这个函数也是返回某种future,也就是说是两个future彼此依赖串联在一起,它类似于Scala的中的flatMap。

    <U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);

    下面是比较thenCompose和thenApply的不同案例(类似于Scala的map和flatMap的不同):

    CompletableFuture<Document> docFuture = //... CompletableFuture<CompletableFuture<Double>> f =     docFuture.thenApply(this::calculateRelevance); CompletableFuture<Double> relevanceFuture =     docFuture.thenCompose(this::calculateRelevance); private CompletableFuture<Double> calculateRelevance(Document doc)  //...

    thenCompose()是能够建立健壮 异步管道pipeline的方法,没有任何堵塞或中间步骤。

     

    thenCombine()用法

      上面thenCompose()是用于多个彼此依赖的futrue进行串联起来,而thenCombine则是并联起两个独立的future,注意,这些future都是在长时间计算都完成以后。

    <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

    假设你有两个CompletableFuture,一个是加载Customer,而另外一个是加载最近商店Shop,它们两个都彼此独立,但是当都加载计算完毕以后,你需要使用它们的值计算路径:

    CompletableFuture<Customer> customerFuture = loadCustomerDetails(123); CompletableFuture<Shop> shopFuture = closestShop(); CompletableFuture<Route> routeFuture =     customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));   //...   private Route findRoute(Customer customer, Shop shop) //...

    在Java8中可以使用this::findRoute 替代(cust, shop) -> findRoute(cust, shop) :

    customerFuture.thenCombine(shopFuture, this::findRoute);

    当我们有了customerFuture和shopFuture,那么routeFuture会包装它们两个并等待两个都完成计算,当都完成长时间计算以后,routeFuture会运行我们提供的函数findRoute(),routeFuture完成标志前两个前提future已经完成并且findRoute()也完成了。

    当有多个CompletableFuture一起工作时,比如你希望在一个CompletableFuture返回的值和另外一个CompletableFuture返回值一起组合在一起再处理,可以使用thenCombine函数,

    Java8的CompletableFuture之二

    332人阅读 评论(0) 收藏 举报

    目录(?)[+]

     前面章节介绍了 CompletableFuture的一些基础功能,现在介绍复杂的一些高级功能。上一节介绍了onCombine是用来并行组合两个独立的Future,这需要建立一个新的CompletableFuture来组合利用两个结果,如果我们现在只是要求两个future计算完成后通知一下,怎么办?可以使用thenAcceptBoth()/runAfterBoth()。

    thenAcceptBoth()/runAfterBoth()用法

      这两个方法类似thenAccept()和thenRun(),不同之处是等待两个future完成,而不是等待一个future完成。

    <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block) CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)

    比如,假设两个future计算完成以后,我们只是需要立即发送一些事件,或者立即刷新GUI,这样就不必再建立一个新的future了,可以使用thenAcceptBoth():

    customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {     final Route route = findRoute(cust, shop);     //使用rout值刷新界面 });

    这种方法比堵塞等待原来两个future要更异步性和具有事件驱动意义。

     acceptEither/runAfterEither用法

      CompletableFuture 还能等待第一个future而不是所有future计算完毕,当你有两个任务,两个都是同样类型的结果,你只关心响应时间,谁快用谁的结果,不关心具体哪个任务首先返回。

    CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block) CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)

    比如,你有两个系统需要继承,一个是会有很快的响应时间但是经常有偏差,而另外一个是慢一些,但是更具有可预期性。为了获得性能和预期性两种好处,你能同时调用这两个系统,等待其中一个第一个完成:

    CompletableFuture<String> fast = fetchFast(); CompletableFuture<String> predictable = fetchPredictably(); fast.acceptEither(predictable, s -> {     System.out.println("Result: " + s); });

    s可能是fetchFast()或fetchPredictably(),我们不关心,我们关系只要能第一个返回就可以,这样有时会第一个方法,有时会第二个方法返回,随机性可让我们充分利用计算机当时运行的情况潜力。

     

    applyToEither()用法

      applyToEither() 是acceptEither()的哥哥. 当两个future其中一个完成后,后者用于只是简单地调用一些代码,applyToEither()会返回一个新的future. 这个future是在前面两个future其中一个完成后进行执行完成。

    CompletableFuture<String> fast = fetchFast(); CompletableFuture<String> predictable = fetchPredictably(); CompletableFuture<String> firstDone =     fast.applyToEither(predictable, Function.<String>identity());

    firstDone 这个future能够被到处传递,从客户端调用角度看,前面两个future是隐藏在firstDone后面的。

     

    allOf用法

      现在我们知道了如何等待两个之中一个首先完成,但是如果不是两个,而是很多个,那么就要使用:

    static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

    注意static,allOf()获得一个future数组然后当所有future都完成后返回一个future,注意,这里只是等待所有的future中最快完成。

     

    RxJava结合

      使用CompletableFuture可以和RxJava结合一起使用。RxJava的被观察者Observable类似一个对象流,能够主动异步推送消息给订阅者(观察者),我们以本文开始第一个案例,也就是计算器count为例,实现同样功能:

    public CompletableFuture countEvents () {      CompletableFuture result = new CompletableFuture <>();      AtomicInteger count = new AtomicInteger ();      Observable . just ( "1" , "2" , "3" , "err" , "4" ). subscribe ( ev -> {              try {                  int x = Integer . parseInt ( ev );                 count . set ( count . get () + x );              } catch ( NumberFormatException e ) { }          },         throwable -> result . complete ( 0 );          () -> {              try {                  //simulate io delay                  Thread . sleep ( 3000 );              } catch ( InterruptedException e ) { }             result . complete ( count . get ());          }      );      return result ; }

    上述代码中Observable提供了1 2 3 err 4几个字符串流,假设遍历这个字符串流是一个费时长的工作,我们需要遍历后计算这个几个字符串的个数,调用代码如下:

    CompletableFuture data = countEvents ()      . thenApply ( count -> {          int transformedValue = count * 25 ;          return transformedValue ;      }). thenApply ( transformed -> "data-" + transformed );   try {      System . out . println ( data . get ()); } catch ( InterruptedException | ExecutionException e ) {     e . printStaceTrace ();

    转载请注明原文地址: https://ju.6miu.com/read-14386.html

    最新回复(0)