Java异步编程之CompletableFuture

article/2025/10/28 7:20:08

异步任务

Future获取异步任务结果

利用 Java 并发包提供的 Future 可以很容易获得异步任务的执行结果,无论异步任务是通过线程池 ThreadPoolExecutor 执行的,还是通过手工创建子线程来执行的。利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。

其类结构如下:

publicinterfaceFuture<V> {booleancancel(boolean mayInterruptIfRunning);booleanisCancelled();booleanisDone();V get()throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}
复制代码

方法解释如下:

Future

备注

cancel(boolean)

尝试取消异步任务的执行。如果任务已经执行完成、已经被取消、因为某种原因不能被取消,则返回false;如果任务正在执行,并且mayInterruptIfRunning为true,那么会调用interrupt()尝试打断任务。该方法返回结果后,isDone()总会返回true

isCancelled()

如果在任务完成前被取消,返回true

isDone()

如果任务完成则返回true。任务完成包括正常结束、任务被取消、任务发生异常,都返回true

get()

获取异步任务执行结果,如果没有返回,则阻塞等待

get(long timeout, TimeUnit unit)

在给定的时间内等待获取异步任务结果,如果超时还未获取到结果,则会抛出TimeoutException

但它本身只是一个接口,还需要看Future的具体实现类:

红色框起来的就是常见的实现类,就不一一展开赘述了,这里以FutureTask的使用为例:

//初始化一个线程池ExecutorService executor = = Executors.newSingleThreadExecutor();//new 一个Callable并传入FutureTaskFutureTask<String> future =new FutureTask<>(newCallable<String>() {publicStringcall() {//do somethingreturn result;}});executor.execute(future);//在异步任务执行期间可以做一些其他的事情displayOtherThings();//通过future.get()得到异步任务执行结果String result=future.get();
复制代码

CompletableFuture异步编程

CompletableFuture的线程池之坑

CompletableFuture又是如何创建新线程的?答案是ForkJoinPool.commonPool() ,我们熟悉的老朋友又回来了,还是它。当需要新的线程时,CompletableFuture会从commonPool中获取线程,相关源码如下:

publicstatic CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
}
privatestaticfinalExecutorasyncPool= useCommonPool ? ForkJoinPool.commonPool() : newThreadPerTaskExecutor();
复制代码

我们已经知道了commonPool的潜在风险,在生产环境中使用无异于给自己挖坑换句话说,当我决定使用CompletableFuture的时候,默认就是我们要创建自己的线程池。不要偷懒,更不要存在侥幸心理。

这里补充一点:CompletableFuture是否使用默认线程池的依据,和机器的CPU核心数有关。

当CPU核心数减1大于1时,才会使用默认的线程池(ForkJoinPool),否则将会为每个CompletableFuture的任务创建一个新线程去执行

即,CompletableFuture的默认线程池,只有在双核以上的机器内才会使用。在双核及以下的机器中,会为每个任务创建一个新线程,等于没有使用线程池,且有资源耗尽的风险

注意事项

  1. 如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

  1. 记得考虑异常处理。

  1. 公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数-1(也可以通过JVM option:**-**Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)

创建 CompletableFuture 对象

CompletableFuture的核心方法总共分为四类,而这四类方法又分为两种模式:同步和异步

类型

接收参数

返回结果

支持异步

Supply

✔︎

✔︎

Run

✔︎

Accept

✔︎

✔︎

Apply

✔︎

✔︎

✔︎

上述接种类型的方法一般都有三个变种方法:同步异步指定线程池。比如, thenApply() 的三个变种方法如下所示:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
复制代码

第一个同步,第二个异步,第三个支持指定线程池

如何理解 CompletionStage 接口

任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。如下图

还要注意参数类型:

  • 参数 fn 的类型是接口 Function<T, R>,代表这个方法既能接收参数也支持返回值

  • 参数 consumer 的类型是接口 Consumer,代表这个方法虽然支持接收参数,但却不支持返回值

  • 参数 action 的类型是 Runnable,代表这个方法既不能接收参数也不支持返回值。

上述参数类型的方法一般都有三个变种方法:同步、异步和指定线程池。方法后缀带 Async 代表的是异步执行

描述串行关系

CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
复制代码

描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口。

both:前两阶段同时执行完毕执行下一阶段

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
复制代码

描述 OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口。

either:前两阶段任一执行完毕执行下一阶段

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
复制代码

案例:李四下班后坐公交回家,可以选择 111 路和 222 路,当其中有一辆公交到达,李四就选择坐这辆车回家,这种场景可以使用 applyToEither 方法

publicclassApplyToEitherTest {publicstaticvoidmain(String[] args) {PrintTool.printTimeAndThread("李四下班准备回家。。。");PrintTool.printTimeAndThread("李四等待111,222公交。。。");CompletableFuture<String> busCF = CompletableFuture.supplyAsync(() -> {PrintTool.printTimeAndThread("111 路公交在路上。。。");PrintTool.sleep(5000);return"906";}).applyToEither(CompletableFuture.supplyAsync(() -> {PrintTool.printTimeAndThread("222 路公交在路上。。。");PrintTool.sleep(4000);return"539";}), first -> first + "路公交");PrintTool.printTimeAndThread("李四坐上" + busCF.join());}
}
复制代码

allOf与anyOf

allOf()anyOf() 也是一对孪生兄弟,当我们需要对多个Future的运行进行组织时,就可以考虑使用它们:

  • allOf() :给定一组任务,等待所有任务执行结束;

  • anyOf() :给定一组任务,等待其中任一任务执行结束。

异常处理

虽然fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常。

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
复制代码

使用exceptionally()回调处理异常

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。

CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(() -> 7 / 0).thenApply(r -> r * 10).exceptionally(e -> 0);复制代码

使用handle()处理异常

既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(() -> 7 / 0).thenApply(r -> r * 10).handle((res, ex) -> {if (ex != null) {System.out.println("出错:" + ex.getMessage());return1;}return0;});
复制代码

CompletionService批量执行异步任务

当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。

利用 CompletionService 实现 Forking Cluster

Dubbo 中有一种叫做Forking 的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了

利用 CompletionService 可以快速实现 Forking 这种集群模式,比如下面的示例代码就展示了具体是如何实现的。首先我们创建了一个线程池 executor 、一个 CompletionService 对象 cs 和一个Future<Integer>类型的列表 futures,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务,会返回一个 Future 对象,我们把这些 Future 对象保存在列表 futures 中。

通过调用 cs.take().get(),我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures = new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures 
futures.add(cs.submit(() -> geocoderByS1()));
futures.add(cs.submit(() -> geocoderByS2()));
futures.add(cs.submit(() -> geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {// 只要有一个成功返回,则 breakfor (int i = 0; i < 3; ++i) {r = cs.take().get();// 简单地通过判空来检查是否成功返回if (r != null) {break;}}
} finally {// 取消所有任务for (Future<Integer> f : futures)f.cancel(true);
}
// 返回结果
return r;
复制代码

总结

对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过CompletableFuture来解决;而批量的并行任务,则可以通过CompletionService来解决。

作者:Linn

链接:https://juejin.cn/post/7203518605017923643

来源:稀土掘金

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


http://chatgpt.dhexx.cn/article/kPsXNjBb.shtml

相关文章

JUC异步编程

什么是JUC JUC的意思是java并发编程工具包&#xff0c;是java.util.concurrent包的简称。目的就是为了更好的支持高并发任务&#xff0c;让开发者利用这个包进行的多线程开发时&#xff0c;可以有效的减少竞争条件和死锁线程。 异步编程 模拟用户下单操作。。。 1、根据地址…

线程、多线程的使用、线程池、异步(CompletableFuture)-48

一&#xff1a;线程 1.初始化线程的四种方式 1&#xff09;、继承 Thread public class ThreadTest {public static void main(String[] args) {System.out.println("main...start...");Thread01 thread new Thread01();//启动线程thread.start();System.out.pri…

CompletableFuture 执行异步任务

CompletableFuture 执行异步任务 参考&#xff1a; (10条消息) Java 8 的异步编程利器 CompletableFuture 真香&#xff01;_不才陈某的博客-CSDN博客 提供几十种方法&#xff0c;帮助异步任务执行调用&#xff1b; 主要包括&#xff1a; 创建异步任务任务异步回调多个任务…

CompletableFuture实现异步编排全面分析和总结

一、&#x1f308;CompletableFuture简介 CompletableFuture结合了Future的优点&#xff0c;提供了非常强大的Future的扩展功能&#xff0c;可以帮助我们简化异步编程的复杂性&#xff0c;提供了函数式编程的能力&#xff0c;可以通过回调的方式处理计算结果&#xff0c;并且提…

【Java8新特性--->异步处理】CompletableFuture

一、引入 假设一个商品详情页需要以下操作&#xff1a; 查询展示商品的基本信息耗时&#xff1a;0.5s查询展示商品的销售信息耗时&#xff1a;0.7s查询展示商品的图片信息耗时&#xff1a;1s查询展示商品销售属性耗时&#xff1a;0.3s查询展示商品规格属性耗时&#xff1a;1.…

CompletableFuture API

目录 1. 为什么要用CompletableFuture1.1 API 2. CompletableFuture Demo1. 创建CompletableFuture2. 定义CompletableFuture完成时和异常时需要回调的实例3. CompletableFuture的优点 3. demo&#xff1a;多个CompletableFuture串行执行4. demo&#xff1a;多个CompletableFut…

CompletableFuture实战与分析

Future对于结果的获取不够好&#xff0c;只能通过阻塞或者轮询的方式得到任务的结果。在Java8中Doug Lea大师提供了一个CompletableFuture工具类&#xff0c;可以更优雅的对异步并行操作进行编排。 Future VS CompletableFuture CompletableFuture支持手动完成任务&#xff0…

Java8 CompletableFuture异步非阻塞做法

创建异步任务 Future.submit supplyAsync / runAsync 异步回调 thenApply / thenApplyAsync thenAccept / thenRun exceptionally whenComplete handle 组合处理 thenCombine / thenAcceptBoth / runAfterBoth applyToEither / acceptEither / runAfterEither thenCom…

2022.2.5 第十三次周报

文章目录 前言一、论文阅读《ROCKET: Exceptionally fast and accurate time series classification using random convolutional kernels》Abstract摘要Introduction介绍Method方法Kernels内核Transform转换Classifier分类器Complexity Analysis复杂性分析 Experiments实验Con…

并发编程(十五)-CompletableFuture中常用方法的使用与分析

文章目录 一、CompletableFuture API介绍1. 描述2. CompletionStage3. CompletableFuture 4个核心静态方法&#xff08;1&#xff09;runAsync(Runnable runnable)&#xff08;2&#xff09;runAsync(Runnable runnable, Executor executor)&#xff08;3&#xff09;supplyAsy…

Java 编程问题:十一、并发-深入探索

原文&#xff1a;Java Coding Problems 协议&#xff1a;CC BY-NC-SA 4.0 贡献者&#xff1a;飞龙 本文来自【ApacheCN Java 译文集】&#xff0c;自豪地采用谷歌翻译。 本章包括涉及 Java 并发的 13 个问题&#xff0c;涉及 Fork/Join 框架、CompletableFuture、ReentrantLock…

线程(十二)---CompletableFuture(三)

写在前面&#xff1a;各位看到此博客的小伙伴&#xff0c;如有不对的地方请及时通过私信我或者评论此博客的方式指出&#xff0c;以免误人子弟。多谢&#xff01; 示例五&#xff1a;异常处理 接着上一篇记录一下CompletableFuture的异常处理&#xff0c;异常处理通常使用…

dice loss

Dice Loss 最先是在VNet 这篇文章中被提出&#xff0c;后来被广泛的应用在了医学影像分割之中。 Dice 系数 Dice系数作为损失函数的原因和混淆矩阵有着很大的关系&#xff0c;下图给出的是一个混淆矩阵&#xff1a; 其中的一些关键指标如下&#xff1a; 精确率(precision)表…

Hinge loss

声明&#xff1a; 参考自维基百科后面可能会更新 Hinge Loss 在机器学习中&#xff0c;hinge loss作为一个损失函数(loss function)&#xff0c;通常被用于最大间隔算法(maximum-margin)&#xff0c;而最大间隔算法又是SVM(支持向量机support vector machines)用到的重要算法…

【深度学习】一文读懂机器学习常用损失函数(Loss Function)

【深度学习】一文读懂机器学习常用损失函数&#xff08;Loss Function&#xff09; 最近太忙已经好久没有写博客了&#xff0c;今天整理分享一篇关于损失函数的文章吧&#xff0c;以前对损失函数的理解不够深入&#xff0c;没有真正理解每个损失函数的特点以及应用范围&#x…

Pytorch之loss(损失函数)

损失函数也在torch.nn下&#xff0c;具体可以参考文档&#xff0c;也可以参考官网 先根据L1Loss举例 我个人感觉这里的描述还是到官网的文档找比较好&#xff0c;公式看的比文档清楚 import torch from torch import nninputs torch.tensor([[3, 2, 1],[1, 2, 3]], dtypetorch…

机器学习 损失函数 Loss function

损失函数 最小二乘法极大似然估计法交叉熵 【本文根据B站-王木头学科学-视频所学】 在梯度下降中&#xff0c;所求的梯度其实就是损失函数的梯度。 损失函数有三种设计方法&#xff1a; &#xff08;1&#xff09;最小二乘法 &#xff08;2&#xff09;极大似然估计法 &#x…

Focal loss 损失函数详解

Focal loss 目前目标检测的算法大致分为两类&#xff0c;One Stage 、Two Stage。 One Stage&#xff1a;主要指类似YOLO、SGD等这样不需要region proposal,直接回归的检测算法&#xff0c;这类算法检测速度很快&#xff0c;但是精度准确率不如使用Two stage的模型。 two St…

机器学习之常见的损失函数(loss function)

解决一个机器学习问题主要有两部分&#xff1a;数据和算法。而算法又有三个部分组成&#xff1a;假设函数、损失函数、算法优化。我们一般在看算法书或者视频教学时&#xff0c;更多的是去推算或者说参数估计出其假设函数&#xff0c;而往往不太注重损失函数&#xff0c;但是损…

深度学习loss函数理解

机器学习中的范数规则化之L0、L1、L2范数及loss函数 监督机器学习问题无非就是“minimizeyour error while regularizing your parameters”&#xff0c;也就是在规则化参数的同时最小化误差。 最小化误差是为了让我们的模型拟合我们的训练数据&#xff0c;而规则化参数是防止我…