CompletableFuture API

article/2025/10/28 7:22:57

目录

  • 1. 为什么要用CompletableFuture
    • 1.1 API
  • 2. CompletableFuture Demo
    • 1. 创建CompletableFuture
    • 2. 定义CompletableFuture完成时和异常时需要回调的实例
    • 3. CompletableFuture的优点
  • 3. demo:多个CompletableFuture串行执行
  • 4. demo:多个CompletableFuture并行执行
    • 流程图
  • 5. 使用CompletableFuture场景

1. 为什么要用CompletableFuture

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

1.1 API

1. 实例化

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

其中supplyAsync()用于有返回值的任务,runAsync()则用于没有返回值的任务。
Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,注意:这些线程都是Daemon线程(守护线程),主线程结束,JVM中不存在其余用户线程,那么CompletableFuture的守护线程会直接退出,造成任务无法完成的问题

2. 获取结果

public T    get() //阻塞直到任务完成
public T    get(long timeout, TimeUnit unit)
public T    getNow(T valueIfAbsent) //参数valueIfAbsent的意思是当计算结果不存在或者Now时刻没有完成任务,给定一个确定的值。
public T    join()

join() 与get() 区别在于join() 返回计算的结果或者抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常

3. 计算完成后续操作1——complete

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

方法1和2的区别在于是否使用异步处理,2和3的区别在于是否使用自定义的线程池,前三个方法都会提供一个返回结果和可抛出异常,我们可以使用lambda表达式的来接收这两个参数,然后自己处理。 方法4,接收一个可抛出的异常,且必须return一个返回值

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {return 10086;
});
future.whenComplete((result, error) -> {System.out.println("拨打"+result);error.printStackTrace();
});

4. 计算完成后续操作2——handle

public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

handle方法集和上面的complete方法集没有区别,同样有两个参数一个返回结果和可抛出异常,区别就在于返回值,没错,虽然同样返回CompletableFuture类型,但是里面的参数类型,handle方法是可以自定义的。

// 开启一个异步方法
CompletableFuture<List> future = CompletableFuture.supplyAsync(() -> {List<String> list = new ArrayList<>();list.add("语文");list.add("数学");// 获取得到今天的所有课程return list;});// 使用handle()方法接收list数据和error异常CompletableFuture<Integer> future2 = future.handle((list,error)-> {// 如果报错,就打印出异常error.printStackTrace();// 如果不报错,返回一个包含Integer的全新的CompletableFuturereturn list.size();// 注意这里的两个CompletableFuture包含的返回类型不同});

5. 计算完成的后续操作3——apply

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

apply方法和handle方法一样,都是结束计算之后的后续操作,唯一的不同是,handle方法会给出异常,可以让用户自己在内部处理,而apply方法只有一个返回结果,如果异常了,会被直接抛出,交给上一层处理。 如果不想每个链式调用都处理异常,那么就使用apply吧。
6. 计算完成的后续操作4——accept

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

accept()三个方法只做最终结果的消费,注意此时返回的CompletableFuture是空返回。只消费,无返回,有点像流式编程的终端操作。
7. 捕获中间产生的异常——exceptionally

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

exceptionally() 可以帮我们捕捉到所有中间过程的异常,方法会给我们一个异常作为参数,我们可以处理这个异常,同时返回一个默认值,跟服务降级 有点像,默认值的类型和上一个操作的返回值相同。 小贴士 :向线程池提交任务的时候发生的异常属于外部异常,是无法捕捉到的,毕竟还没有开始执行任务。作者也是在触发线程池拒绝策略的时候发现的。exceptionally() 无法捕捉RejectedExecutionException

// 实例化一个CompletableFuture,返回值是Integer
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 返回nullreturn null;});CompletableFuture<String> exceptionally = future.thenApply(result -> {// 制造一个空指针异常NPEint i = result;return i;}).thenApply(result -> {// 这里不会执行,因为上面出现了异常String words = "现在是" + result + "点钟";return words;}).exceptionally(error -> {// 我们选择在这里打印出异常error.printStackTrace();// 并且当异常发生的时候,我们返回一个默认的文字return "出错啊~";});exceptionally.thenAccept(System.out::println);}

在这里插入图片描述
8. thenApply()
假设一个场景,我是一个小学生,我想知道今天我需要上几门课程 此时我需要两个步骤,1.根据我的名字获取我的学生信息 2.根据我的学生信息查询课程 我们可以用下面这种方式来链式调用api,使用上一步的结果进行下一步操作

CompletableFuture<List<Lesson>> future = CompletableFuture.supplyAsync(() -> {// 根据学生姓名获取学生信息return StudentService.getStudent(name);
}).thenApply(student -> {// 再根据学生信息获取今天的课程return LessonsService.getLessons(student);
});

9. thenCombine()
假设一个场景,我是一个小学生,今天有劳技课和美术课,我需要查询到今天需要带什么东西到学校

CompletableFuture<List<String>> painting = CompletableFuture.supplyAsync(() -> {// 第一个任务获取美术课需要带的东西,返回一个listList<String> stuff = new ArrayList<>();stuff.add("画笔");stuff.add("颜料");return stuff;
});
CompletableFuture<List<String>> handWork = CompletableFuture.supplyAsync(() -> {// 第二个任务获取劳技课需要带的东西,返回一个listList<String> stuff = new ArrayList<>();stuff.add("剪刀");stuff.add("折纸");return stuff;
});
CompletableFuture<List<String>> total = painting// 传入handWork列表,然后得到两个CompletableFuture的参数Stuff1和2.thenCombine(handWork, (stuff1, stuff2) -> {// 合并成新的listList<String> totalStuff = Stream.of(stuff1, stuff1).flatMap(Collection::stream).collect(Collectors.toList());return totalStuff;});
System.out.println(JSONObject.toJSONString(total.join()));

10. 获取所有完成结果——allOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {try {//使用sleep()模拟耗时操作TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 1;
});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {return 2;
});CompletableFuture.allOf(future1, future1);
// 输出3
System.out.println(future1.join()+future2.join());

11. 获取率先完成的任务结果——anyOf
仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。 小贴士 :如果最快完成的任务出现了异常,也会先返回异常,如果害怕出错可以加个exceptionally() 去处理一下可能发生的异常并设定默认返回值

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {throw new NullPointerException();
});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {// 睡眠3s模拟延时TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return 1;
});
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future, future2).exceptionally(error -> {error.printStackTrace();return 2;});
System.out.println(anyOf.join());

thenApplyAsync():用于串行化另一个CompletableFuture;
anyOf()allOf():用于并行化多个CompletableFuture。
anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”

2. CompletableFuture Demo

public class Main {public static void main(String[] args) throws Exception {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);// 如果执行成功:cf.thenAccept((result) -> {System.out.println("price: " + result);});// 如果执行异常:cf.exceptionally((e) -> {e.printStackTrace();return null;});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(200);}static Double fetchPrice() {try {Thread.sleep(100);} catch (InterruptedException e) {}if (Math.random() < 0.3) {throw new RuntimeException("fetch price failed!");}return 5 + Math.random() * 20;}
}

1. 创建CompletableFuture

创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:

public interface Supplier<T> {T get();
}

这里我们用lambda语法简化了一下,直接传入Main::fetchPrice,因为Main.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)。

2. 定义CompletableFuture完成时和异常时需要回调的实例

完成时,CompletableFuture会调用Consumer对象:

public interface Consumer<T> {void accept(T t);
}

异常时,CompletableFuture会调用Function对象:

public interface Function<T, R> {R apply(T t);
}

3. CompletableFuture的优点

异步任务结束时,会自动回调某个对象的方法;
异步任务出错时,会自动回调某个对象的方法;
主线程设置好回调后,不再关心异步任务的执行。

3. demo:多个CompletableFuture串行执行

例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下

public class Main {public static void main(String[] args) throws Exception {// 第一个任务:CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油");});// cfQuery成功后继续执行下一个任务:CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {return fetchPrice(code);});// cfFetch成功后打印结果:cfFetch.thenAccept((result) -> {System.out.println("price: " + result);});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(2000);}static String queryCode(String name) {try {Thread.sleep(100);} catch (InterruptedException e) {}return "601857";}static Double fetchPrice(String code) {try {Thread.sleep(100);} catch (InterruptedException e) {}return 5 + Math.random() * 20;}
}

4. demo:多个CompletableFuture并行执行

例如,我们考虑这样的场景:
同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

public class Main {public static void main(String[] args) throws Exception {// 两个CompletableFuture执行异步查询:CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油", "https://finance.sina.com.cn/code/");});CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油", "https://money.163.com/code/");});// 用anyOf合并为一个新的CompletableFuture:CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);// 两个CompletableFuture执行异步查询:CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {return fetchPrice((String) code, "https://finance.sina.com.cn/price/");});CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {return fetchPrice((String) code, "https://money.163.com/price/");});// 用anyOf合并为一个新的CompletableFuture:CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);// 最终结果:cfFetch.thenAccept((result) -> {System.out.println("price: " + result);});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(200);}static String queryCode(String name, String url) {System.out.println("query code from " + url + "...");try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {}return "601857";}static Double fetchPrice(String code, String url) {System.out.println("query price from " + url + "...");try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {}return 5 + Math.random() * 20;}
}

流程图

在这里插入图片描述
除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

5. 使用CompletableFuture场景

  1. 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度
  2. 使用CompletableFuture类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行种发生的异常
  3. 如果这些异步任务之间相互独立,或者他们之间的的某一些的结果是另一些的输入,你可以讲这些异步任务构造或合并成一个

来源 :https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
API:https://zhuanlan.zhihu.com/p/344431341
美团CompletableFuture详解(未整理):https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html


http://chatgpt.dhexx.cn/article/6reLAnVq.shtml

相关文章

CompletableFuture实战与分析

Future对于结果的获取不够好&#xff0c;只能通过阻塞或者轮询的方式得到任务的结果。在Java8中Doug Lea大师提供了一个CompletableFuture工具类&#xff0c;可以更优雅的对异步并行操作进行编排。 Future VS CompletableFuture CompletableFuture支持手动完成任务&#xff0…

Java8 CompletableFuture异步非阻塞做法

创建异步任务 Future.submit supplyAsync / runAsync 异步回调 thenApply / thenApplyAsync thenAccept / thenRun exceptionally whenComplete handle 组合处理 thenCombine / thenAcceptBoth / runAfterBoth applyToEither / acceptEither / runAfterEither thenCom…

2022.2.5 第十三次周报

文章目录 前言一、论文阅读《ROCKET: Exceptionally fast and accurate time series classification using random convolutional kernels》Abstract摘要Introduction介绍Method方法Kernels内核Transform转换Classifier分类器Complexity Analysis复杂性分析 Experiments实验Con…

并发编程(十五)-CompletableFuture中常用方法的使用与分析

文章目录 一、CompletableFuture API介绍1. 描述2. CompletionStage3. CompletableFuture 4个核心静态方法&#xff08;1&#xff09;runAsync(Runnable runnable)&#xff08;2&#xff09;runAsync(Runnable runnable, Executor executor)&#xff08;3&#xff09;supplyAsy…

Java 编程问题:十一、并发-深入探索

原文&#xff1a;Java Coding Problems 协议&#xff1a;CC BY-NC-SA 4.0 贡献者&#xff1a;飞龙 本文来自【ApacheCN Java 译文集】&#xff0c;自豪地采用谷歌翻译。 本章包括涉及 Java 并发的 13 个问题&#xff0c;涉及 Fork/Join 框架、CompletableFuture、ReentrantLock…

线程(十二)---CompletableFuture(三)

写在前面&#xff1a;各位看到此博客的小伙伴&#xff0c;如有不对的地方请及时通过私信我或者评论此博客的方式指出&#xff0c;以免误人子弟。多谢&#xff01; 示例五&#xff1a;异常处理 接着上一篇记录一下CompletableFuture的异常处理&#xff0c;异常处理通常使用…

dice loss

Dice Loss 最先是在VNet 这篇文章中被提出&#xff0c;后来被广泛的应用在了医学影像分割之中。 Dice 系数 Dice系数作为损失函数的原因和混淆矩阵有着很大的关系&#xff0c;下图给出的是一个混淆矩阵&#xff1a; 其中的一些关键指标如下&#xff1a; 精确率(precision)表…

Hinge loss

声明&#xff1a; 参考自维基百科后面可能会更新 Hinge Loss 在机器学习中&#xff0c;hinge loss作为一个损失函数(loss function)&#xff0c;通常被用于最大间隔算法(maximum-margin)&#xff0c;而最大间隔算法又是SVM(支持向量机support vector machines)用到的重要算法…

【深度学习】一文读懂机器学习常用损失函数(Loss Function)

【深度学习】一文读懂机器学习常用损失函数&#xff08;Loss Function&#xff09; 最近太忙已经好久没有写博客了&#xff0c;今天整理分享一篇关于损失函数的文章吧&#xff0c;以前对损失函数的理解不够深入&#xff0c;没有真正理解每个损失函数的特点以及应用范围&#x…

Pytorch之loss(损失函数)

损失函数也在torch.nn下&#xff0c;具体可以参考文档&#xff0c;也可以参考官网 先根据L1Loss举例 我个人感觉这里的描述还是到官网的文档找比较好&#xff0c;公式看的比文档清楚 import torch from torch import nninputs torch.tensor([[3, 2, 1],[1, 2, 3]], dtypetorch…

机器学习 损失函数 Loss function

损失函数 最小二乘法极大似然估计法交叉熵 【本文根据B站-王木头学科学-视频所学】 在梯度下降中&#xff0c;所求的梯度其实就是损失函数的梯度。 损失函数有三种设计方法&#xff1a; &#xff08;1&#xff09;最小二乘法 &#xff08;2&#xff09;极大似然估计法 &#x…

Focal loss 损失函数详解

Focal loss 目前目标检测的算法大致分为两类&#xff0c;One Stage 、Two Stage。 One Stage&#xff1a;主要指类似YOLO、SGD等这样不需要region proposal,直接回归的检测算法&#xff0c;这类算法检测速度很快&#xff0c;但是精度准确率不如使用Two stage的模型。 two St…

机器学习之常见的损失函数(loss function)

解决一个机器学习问题主要有两部分&#xff1a;数据和算法。而算法又有三个部分组成&#xff1a;假设函数、损失函数、算法优化。我们一般在看算法书或者视频教学时&#xff0c;更多的是去推算或者说参数估计出其假设函数&#xff0c;而往往不太注重损失函数&#xff0c;但是损…

深度学习loss函数理解

机器学习中的范数规则化之L0、L1、L2范数及loss函数 监督机器学习问题无非就是“minimizeyour error while regularizing your parameters”&#xff0c;也就是在规则化参数的同时最小化误差。 最小化误差是为了让我们的模型拟合我们的训练数据&#xff0c;而规则化参数是防止我…

CE Loss,BCE Loss以及Focal Loss的原理理解

一、交叉熵损失函数&#xff08;CE Loss&#xff0c;BCE Loss&#xff09; 最开始理解交叉熵损失函数被自己搞的晕头转向的&#xff0c;最后发现是对随机变量的理解有偏差&#xff0c;不知道有没有读者和我有着一样的困惑&#xff0c;所以在本文开始之前&#xff0c;先介绍一下…

损失函数loss

http://blog.csdn.net/pipisorry/article/details/23538535 监督学习及其目标函数 损失函数&#xff08;loss function&#xff09;是用来估量你模型的预测值f(x)与真实值Y的不一致程度&#xff0c;它是一个非负实值函数&#xff0c;通常使用L(Y, f(x))来表示。 损失函数是经…

机器学习模型中的损失函数loss function

1. 概述 在机器学习算法中&#xff0c;有一个重要的概念就是损失函数&#xff08;Loss Function&#xff09;。损失函数的作用就是度量模型的预测值 f ( x ) f\left ( \mathbf{x} \right ) f(x)与真实值 y \mathbf{y} y之间的差异程度的函数&#xff0c;且是一个非负实值函数。…

损失函数(Loss)

如果我们定义了一个机器学习模型&#xff0c;比如一个三层的神经网络&#xff0c;那么就需要使得这个模型能够尽可能拟合所提供的训练数据。但是我们如何评价模型对于数据的拟合是否足够呢&#xff1f;那就需要使用相应的指标来评价它的拟合程度&#xff0c;所使用到的函数就称…

focal loss详解

文章目录 focal loss的整体理解易分辨样本、难分辨样本的含义focal loss的出现过程focal loss 举例说明focal loss的 α \alpha α变体 focal loss的整体理解 focal loss 是一种处理样本分类不均衡的损失函数&#xff0c;它侧重的点是根据样本分辨的难易程度给样本对应的损失添…

深度学习——损失函数(Regression Loss、Classification Loss)

简介 Loss function 损失函数 用于定义单个训练样本与真实值之间的误差 Cost function 代价函数 用于定义单个批次/整个训练集样本与真实值之间的误差 Objective function 目标函数 泛指任意可以被优化的函数 损失函数用于衡量模型所做出的预测离真实值(GT)之间的偏离程度。 …