并发编程(十五)-CompletableFuture中常用方法的使用与分析

article/2025/10/28 7:19:02

文章目录

  • 一、CompletableFuture API介绍
    • 1. 描述
    • 2. CompletionStage
    • 3. CompletableFuture 4个核心静态方法
      • (1)runAsync(Runnable runnable)
      • (2)runAsync(Runnable runnable, Executor executor)
      • (3)supplyAsync(Supplier supplier)
      • (4)supplyAsync(Supplier supplier, Executor executor)
    • 4. CompletableFuture 常用方法
      • (1) 获得结果和触发计算类型的方法
        • get()
        • get(long timeout, TimeUnit unit)
        • join()
        • getNow(T valueIfAbsent)
      • (2) 对计算结果进行处理类型的方法
        • thenApply
        • handle
        • thenApply 与 thenCompose 对比
        • whenComplete
        • exceptionally、whenComplete、handle 对比
      • (3) 对计算结果进行消费类型的方法
        • thenAccept
        • thenRun
        • thenRun、thenAccept、thenApply 对比
        • thenRun 与 thenRunAsync 对比
      • (4) 对计算速度进行选用类型的方法
        • applyToEither
        • applyToEither、acceptEither、runAfterEither 对比
      • (5) 对计算结果进行合并类型的方法
        • thenCombine
        • thenCombine、thenAcceptBoth、runAfterBoth 对比
    • 5. CompletableFuture 性能
      • 分析 CompletableFuture 完成异步任务

一、CompletableFuture API介绍

1. 描述

在这里插入图片描述

  • CompletableFuture实现了两个接口(如上图所示):Future、CompletionStage。
  • Future表示异步计算的结果
  • CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。
  • 从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。
// CompletableFuture 实现了Future、CompletionStage两个接口
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 

以下是对源码描述的翻译(需要细品):

  • CompletableFuture是一个可以显式完成的 Future(可以设置其值和状态),并且可以用作 CompletionStage,支持在其完成时触发相关函数和操作。
  • 当两个或更多线程尝试完成、完成异常或取消 CompletableFuture 时,只有其中一个成功。
  • 除了直接操作状态和结果的这些相关方法之外,CompletableFuture 还使用以下策略实现了 CompletionStage接口:
    • 为非异步方法的依赖完成提供的操作,可以由完成当前 CompletableFuture 的线程执行,也可以由完成方法的任何其他调用者执行(例如:main线程空闲后也可以执行CompletableFuture任务)。
    • 所有没有传入 Executor 参数的异步方法,都使用 ForkJoinPool.commonPool() 执行(除非它不支持至少两个并行级别,在这种情况下,会创建一个新线程来运行每个任务)。为了简化监控、调试和跟踪,所有生成的异步任务都是标记接口 CompletableFuture.AsynchronousCompletionTask 的实例。
    • 所有 CompletionStage 方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖的影响。
  • CompletableFuture 还使用以下策略实现 Future接口:
    • 由于(与 FutureTask 不同)此类无法直接控制导致其完成的计算,因此取消被视为另一种形式的异常完成。方法 cancel 与 completeExceptionally(new CancellationException()) 具有相同的效果。方法 isCompletedExceptionally 可用于确定 CompletableFuture 是否以任何异常方式完成。
    • 如果出现 CompletionException 异常完成,get() 和 get(long, TimeUnit) 方法会抛出 ExecutionException,原因与对应的 CompletionException 中的原因相同。为了在大多数情况下简化使用,此类还定义了 join() 和 getNow 方法,它们在这些情况下直接抛出 CompletionException。
由于 FutureTask 的2个缺点,JDK8设计出了CompletableFuture,CompletableFuture 提供了一种类似观察者模式的机制,
可以让异步任务完成后回调通知监听的一方,可以避免阻塞和轮询
CompletableFuture 优点:
异步任务或异常,会自动回调某个对象的方法
主线程设置好回调后,不再关心异步任务的执行,异步任务之间也可以顺序执行

2. CompletionStage

以下是对源码描述的翻译(需要细品):

  • CompletionStage可能是异步计算的一个阶段,它在另一个 CompletionStage 完成时执行一个操作或计算一个值。一个阶段在其计算终止时完成,但这可能反过来触发其他相关阶段。此接口中定义的功能仅采用几种基本形式,可扩展为更大的方法集以捕获一系列使用风格:
    • 阶段执行的计算可以表示为 Function、Consumer 或 Runnable(分别使用名称包括 apply、accept 或 run 的方法),具体取决于它是否需要参数或产生结果。例如 stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())。另一种形式(compose)应用阶段本身的功能,而不是它们的结果。
    • 一个阶段的执行可以由单个阶段的完成触发,也可以由两个阶段的完成触发,或者两个阶段中的任一个触发。使用带有前缀 then 的方法来排列单个阶段的依赖关系。由两个阶段完成触发的那些可以使用相应命名的方法组合它们的结果或效果。由两个阶段中的任何一个触发的那些不保证哪些结果或效果用于从属阶段的计算。
    • 两种方法形式支持处理触发阶段是正常完成还是异常完成:方法 whenComplete 允许注入动作而不管结果如何,否则在完成时保留结果。方法句柄还允许阶段计算替换结果,该结果可以允许其他相关阶段进行进一步处理。在所有其他情况下,如果一个阶段的计算因(未经检查的)异常或错误而突然终止,则所有需要其完成的相关阶段也会异常完成,并且 CompletionException 将异常作为其原因。如果一个阶段依赖于两个阶段,并且都异常完成,那么 CompletionException 可能对应于这些异常中的任何一个。如果一个阶段依赖于其他两个阶段中的任何一个,并且其中只有一个异常完成,则无法保证依赖阶段是正常完成还是异常完成。在方法 whenComplete 的情况下,当提供的操作本身遇到异常时,如果尚未异常完成,则阶段异常完成此异常。
  • 所有方法都遵循上述触发、执行和异常完成规范(在单个方法规范中不再重复)。此外,虽然用于为接受它们的方法传递完成结果(即,对于类型 T 的参数)的参数可能为 null,但为任何其他参数传递 null 值将导致引发 NullPointerException。
  • 该接口未定义初始创建、正常或异常强制完成、探测完成状态或结果或等待阶段完成的方法。 CompletionStage 的实现可以酌情提供实现这种效果的方法。方法 toCompletableFuture 通过提供一个通用的转换类型来实现此接口的不同实现之间的互操作性。

3. CompletableFuture 4个核心静态方法

(1)runAsync(Runnable runnable)

(2)runAsync(Runnable runnable, Executor executor)

(3)supplyAsync(Supplier supplier)

(4)supplyAsync(Supplier supplier, Executor executor)

runAsync 执行的任务无返回值,
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 
supplyAsync 执行的任务有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
以上传入 Executor 可以使用我们指定的线程池,不传则使用默认的 ForkJoinPool.commonPool()线程池

CompletableFuture 源码:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {// 默认线程池 —— ForkJoinPool.commonPool() 除非它不支持并行。private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();// 如果 ForkJoinPool.commonPool() 不支持并行性,则回退static final class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) { new Thread(r).start(); }}// 返回一个新的 CompletableFuture,它在运行给定操作后由 ForkJoinPool.commonPool() 中运行的任务异步完成。public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}// 返回一个新的 CompletableFuture,它由在 ForkJoinPool.commonPool() 中运行的任务异步完成,调用者可以获取返回值。public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}
}

测试 1:

@Slf4j(topic = "c.TestsSupplyAsync")
public class TestsSupplyAsync {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {log.debug("线程名:{}", Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "hello supplyAsync";}, pool);log.debug("{}", supplyAsync.get());// 也可以使用join,使用join可以不用声明异常
//      log.debug("{}", supplyAsync.join());pool.shutdown();}
}
17:52:23.694 c.TestsSupplyAsync [pool-1-thread-1] - 线程名:pool-1-thread-1
17:52:24.706 c.TestsSupplyAsync [main] - hello supplyAsync

测试 2:

需求:

  • 同时搜索出同款产品在各大平台的售价
  • 输出格式:List< String>
    • 《Java》in JD price is ***
    • 《Java》in DangDang price is ***
    • 《Java》in Taobao price is ***
public class TestCompletableFuture {static List<Mall> list = Arrays.asList(new Mall("JD"),new Mall("DangDang"),new Mall("TaoBao"));public static void main(String[] args) {long start = System.currentTimeMillis();List<String> price = getPrice(list, "java");price.forEach(System.out::println);long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start) + "毫秒");}// List<Mall> -> List<CompletableFuture<String>> -> List<String>public static List<String> getPrice(List<Mall> mallList, String bookName) {return mallList.stream().map(mall -> CompletableFuture.supplyAsync(() ->String.format(bookName + " in %s price is %.2f",mall.getMallName(),mall.getBookPrice(bookName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());}
}class Mall {@Getterprivate final String mallName;public Mall(String mallName) {this.mallName = mallName;}public double getBookPrice(String bookName){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}// 随机产生价格return ThreadLocalRandom.current().nextDouble() * 2 + bookName.charAt(0);}
}
java in JD price is 106.31
java in DangDang price is 106.35
java in TaoBao price is 107.81
耗时:1125毫秒

在这里插入图片描述

4. CompletableFuture 常用方法

(1) 获得结果和触发计算类型的方法

  • 获得结果类型的方法

get()

get(long timeout, TimeUnit unit)

join()

getNow(T valueIfAbsent)

// 等待任务完成,并返回结果
public T get() throws InterruptedException, ExecutionException
// 带超时的等待,并返回结果,超时则抛出异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
// 等待任务完成,并返回结果,不用声明异常 (编译时不会检查异常)
// 出现异常时,抛出 CompletionException 异常
public T join()
// 如果任务完成则返回结果值(或抛出任何遇到的异常),否则返回给定的参数值 valueIfAbsent
// 不会阻塞
public T getNow(T valueIfAbsent)

测试 getNow()

public static void main(String[] args) {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "测试getNow方法";});System.out.println("结果:" + completableFuture.getNow("这是任务没完成返回的指定值"));
}
结果:这是任务没完成返回的指定值
  • 主动触发计算的方法
// 如果尚未完成,则将 get() 和相关方法返回的值设置为给定值
// 表示是否打断get()、join()等方法,立即返回指定值 value
public boolean complete(T value)

测试 complete():

public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {// 执行 2sTimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "测试getNow方法";});// 等 1sTimeUnit.SECONDS.sleep(1);// 是否打断get()等方法boolean flag = completableFuture.complete("这是任务没完成返回的指定值");String value = completableFuture.get();System.out.println("是否打断:" + flag + ", 结果:" + value);}
是否打断:true, 结果:这是任务没完成返回的指定值

(2) 对计算结果进行处理类型的方法

thenApply

  • 计算结果存在依赖关系,多个线程之间是串行化的
  • 当前阶段出现异常,将不会执行后面阶段
  • 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的结果作为下一个阶段的参数
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

测试正常情况:

@Slf4j(topic = "c.TestThenApply")
public class TestThenApply {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {log.debug("第一阶段");return 1;}, pool).thenApply((v) -> {log.debug("第二阶段");return v + 2;}).thenApply((v) -> {log.debug("第三阶段");return v + 3;}).whenComplete((v, e) -> {log.debug("结果:" + v);}).exceptionally(e -> {log.debug("出现异常:" + e.getMessage());return null;});pool.shutdown();}
}
22:45:39.496 c.TestThenApply [pool-1-thread-1] - 第一阶段
22:45:39.499 c.TestThenApply [pool-1-thread-1] - 第二阶段
22:45:39.499 c.TestThenApply [pool-1-thread-1] - 第三阶段
22:45:39.500 c.TestThenApply [pool-1-thread-1] - 结果:6

测试异常情况:

@Slf4j(topic = "c.TestThenApply")
public class TestThenApply {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {log.debug("第一阶段");return 1;}, pool).thenApply((v) -> {int i = 10 / 0;log.debug("第二阶段");return v + 2;}).thenApply((v) -> {log.debug("第三阶段");return v + 3;}).whenComplete((v, e) -> {log.debug("结果:" + v);}).exceptionally(e -> {log.debug("出现异常:" + e.getMessage());return null;});pool.shutdown();}
}
22:46:05.153 c.TestThenApply [pool-1-thread-1] - 第一阶段
22:46:05.158 c.TestThenApply [pool-1-thread-1] - 结果:null
22:46:05.158 c.TestThenApply [pool-1-thread-1] - 出现异常:java.lang.ArithmeticException: / by zero

handle

  • 计算结果存在依赖关系,多个线程之间是串行化的
  • 当前阶段出现异常,将会执行后面阶段,可以根据异常做进一步处理
  • 返回一个新的 CompletionStage,当此阶段正常或异常完成时,将使用此阶段的结果作为下一个阶段的参数
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

测试正常情况:

@Slf4j(topic = "c.TestHandle")
public class TestHandle {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {log.debug("第一阶段");return 1;}, pool).handle((v, e) -> {log.debug("第二阶段");return v + 2;}).handle((v, e) -> {log.debug("第三阶段");if (e != null) {log.debug("捕获上一阶段的异常:" + e.getMessage());}return v + 3;}).whenComplete((v, e) -> {log.debug("结果:" + v);}).exceptionally(e -> {log.debug("出现异常:" + e.getMessage());return null;});pool.shutdown();}
}
22:39:52.103 c.TestHandle [pool-1-thread-1] - 第一阶段
22:39:52.107 c.TestHandle [pool-1-thread-1] - 第二阶段
22:39:52.107 c.TestHandle [pool-1-thread-1] - 第三阶段
22:39:52.107 c.TestHandle [pool-1-thread-1] - 结果:6

测试异常情况:

@Slf4j(topic = "c.TestHandle")
public class TestHandle {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {log.debug("第一阶段");return 1;}, pool).handle((v, e) -> {int i = 10 / 0;log.debug("第二阶段");return v + 2;}).handle((v, e) -> {log.debug("第三阶段");if (e != null) {log.debug("捕获上一阶段的异常:" + e.getMessage());}return v + 3;}).whenComplete((v, e) -> {log.debug("结果:" + v);}).exceptionally(e -> {log.debug("出现异常:" + e.getMessage());return null;});pool.shutdown();}
}
22:41:21.347 c.TestHandle [pool-1-thread-1] - 第一阶段
22:41:21.352 c.TestHandle [pool-1-thread-1] - 第三阶段
22:41:21.352 c.TestHandle [pool-1-thread-1] - 捕获上一阶段的异常:java.lang.ArithmeticException: / by zero
22:41:21.353 c.TestHandle [pool-1-thread-1] - 结果:null
22:41:21.353 c.TestHandle [pool-1-thread-1] - 出现异常:java.lang.NullPointerException

注意: 出现的空指针异常(v为null,出现在 return v + 2; 和 return v + 3;两处地方)

thenApply 与 thenCompose 对比

  • thenApply 和 thenCompose 都是将当前阶段的结果传递给下一个阶段,并返回一个新的 CompletionStage
  • 类似于stream中的map和flatMap
public <U> CompletionStage<U>   thenApply(Function<? super T, ? extends U> fn);public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
这两个方法的返回值都是CompletionStage<U>,只是传入函数fn不同
对于thenApply,fn函数是对一个已完成的CompletionStage的返回值进行计算、操作
对于thenCompose,fn函数是对新创建的CompletionStage进行计算、操作

测试:

public static void main(String[] args) {CompletableFuture<String> thenApply = CompletableFuture.supplyAsync(() -> 1).thenApply(v -> "将Integer类型值:" + v + ",转换成字符串");CompletableFuture<String> thenCompose = CompletableFuture.supplyAsync(() -> 1).thenCompose(v -> CompletableFuture.supplyAsync(() -> "新创建一个CompletableFuture将Integer类型值:" + v + ",转换成字符串"));System.out.println("thenApply:" +thenApply.join());System.out.println("thenCompose:" + thenCompose.join());}
thenApply:Integer类型值:1,转换成字符串
thenCompose:新创建一个CompletableFutureInteger类型值:1,转换成字符串

whenComplete

  • 返回一个与此阶段具有相同结果或异常的新 CompletionStage,它在此阶段完成时执行给定的操作。
  • 当这个阶段完成时,使用这个阶段的结果(如果没有,则为 null)和异常(如果没有,则为 null)作为参数传递到下一个阶段。
  • 当动作返回时,返回阶段完成。如果提供的操作本身遇到异常,则返回的阶段异常完成并出现此异常,除非此阶段也异常完成。

whenComplete 源码:

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

测试:

  • 不传入线程池,使用默认线程池 —— ForkJoinPool.commonPool()
@Slf4j(topic = "c.TestWhenComplete")
public class TestWhenComplete {public static void main(String[] args) throws InterruptedException {CompletableFuture.supplyAsync(() -> {// 随机数int i = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}log.debug("1s后返回结果:{}", i);return i;}).whenComplete((v, e) -> {if (e == null) {log.debug("没有异常,获取结果:{}", v);}}).exceptionally(e -> {log.debug("出现异常:{}", e.getMessage());return null;});log.debug("主线程去执行其他任务。。。");log.debug("主线程执行完成");}
}
18:18:51.912 c.TestWhenComplete [main] - 主线程去执行其他任务。。。
18:18:51.916 c.TestWhenComplete [main] - 主线程执行完成Process finished with exit code 0
由结果可以看出,主线程停止后,子线程也随之停止
由于ForkJoinPool.commonPool()类似守护线程,还没执行完就随主线程一起关闭了
解决方法:
1. 让主线程执行完后,停留一段时间
2. 传入自定义线程池
  • 让主线程执行完后,停留一段时间
@Slf4j(topic = "c.TestWhenComplete")
public class TestWhenComplete {public static void main(String[] args) throws InterruptedException {CompletableFuture.supplyAsync(() -> {// 随机数int i = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}log.debug("1s后返回结果:{}", i);return i;}).whenComplete((v, e) -> {if (e == null) {log.debug("没有异常,获取结果:{}", v);}}).exceptionally(e -> {log.debug("出现异常:{}", e.getMessage());return null;});log.debug("主线程去执行其他任务。。。");log.debug("主线程执行完成");// 默认的线程池 —— ForkJoinPool.commonPool()类似守护线程,主线程关闭后,也会随之关闭TimeUnit.SECONDS.sleep(2);log.debug("主线程阻塞2s");}
}
18:23:24.503 c.TestWhenComplete [main] - 主线程去执行其他任务。。。
18:23:24.506 c.TestWhenComplete [main] - 主线程执行完成
18:23:25.509 c.TestWhenComplete [ForkJoinPool.commonPool-worker-9] - 1s后返回结果:8
18:23:25.511 c.TestWhenComplete [ForkJoinPool.commonPool-worker-9] - 没有异常,获取结果:8
18:23:26.519 c.TestWhenComplete [main] - 主线程阻塞2sProcess finished with exit code 0
  • 传入自定义线程池
@Slf4j(topic = "c.TestWhenComplete")
public class TestWhenComplete {public static void main(String[] args) throws InterruptedException {ExecutorService pool = null;try {pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {// 随机数int i = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}log.debug("1s后返回结果:{}", i);return i;// 传入自定义线程池}, pool).whenComplete((v, e) -> {if (e == null) {log.debug("没有异常,获取结果:{}", v);}}).exceptionally(e -> {log.debug("出现异常:{}", e.getMessage());return null;});log.debug("主线程去执行其他任务。。。");log.debug("主线程执行完成");} catch (Exception e) {e.printStackTrace();} finally {pool.shutdown();}}
}
18:37:39.968 c.TestWhenComplete [main] - 主线程去执行其他任务。。。
18:37:39.970 c.TestWhenComplete [main] - 主线程执行完成
18:37:40.973 c.TestWhenComplete [pool-1-thread-1] - 1s后返回结果:1
18:37:40.975 c.TestWhenComplete [pool-1-thread-1] - 没有异常,获取结果:1
  • 模拟异常
@Slf4j(topic = "c.TestWhenComplete")
public class TestWhenComplete {public static void main(String[] args) throws InterruptedException {ExecutorService pool = null;try {pool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() -> {// 随机数int i = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}log.debug("1s后返回结果:{}", i);if (i > 2) {// 模拟异常i = 10/0;}return i;// 传入自定义线程池}, pool).whenComplete((v, e) -> {if (e == null) {log.debug("没有异常,获取结果:{}", v);} else {log.debug("捕获上一阶段异常:{}", e.getMessage());}}).exceptionally(e -> {log.debug("出现异常:{}", e.getMessage());return null;});log.debug("主线程去执行其他任务。。。");log.debug("主线程执行完成");} catch (Exception e) {e.printStackTrace();} finally {pool.shutdown();}}
}
00:04:32.849 c.TestWhenComplete [main] - 主线程去执行其他任务。。。
00:04:32.851 c.TestWhenComplete [main] - 主线程执行完成
00:04:33.849 c.TestWhenComplete [pool-1-thread-1] - 1s后返回结果:4
00:04:33.854 c.TestWhenComplete [pool-1-thread-1] - 捕获上一阶段异常:java.lang.ArithmeticException: / by zero
00:04:33.854 c.TestWhenComplete [pool-1-thread-1] - 出现异常:java.lang.ArithmeticException: / by zero

exceptionally、whenComplete、handle 对比

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
  • exceptionally 处理上一个阶段的异常,并且可以再返回一个值
  • handle 可以获取上一个阶段正常或异常的结果,并且有返回值,让下一个阶段继续执行
  • whenComplete 可以获取上一个阶段正常或异常的结果,没有返回值

(3) 对计算结果进行消费类型的方法

thenAccept

  • 接收任务的处理结果,并消费处理,无返回结果
  • 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的结果作为提供的操作的参数来执行该新阶段
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

测试:

@Slf4j(topic = "c.TestThenAccept")
public class TestThenAccept {public static void main(String[] args) {CompletableFuture<Void> thenAccept = CompletableFuture.supplyAsync(() -> {log.debug("第一阶段");return 1;}).thenApply(v -> {log.debug("第二阶段");return v + 2;}).thenApply(v -> {log.debug("第三阶段");return v + 3;}).thenAccept(v -> {log.debug("结果:" + v);});log.debug("返回结果:{}", thenAccept.join());}}
22:52:09.389 c.TestThenAccept [ForkJoinPool.commonPool-worker-1] - 第一阶段
22:52:09.394 c.TestThenAccept [ForkJoinPool.commonPool-worker-1] - 第二阶段
22:52:09.394 c.TestThenAccept [ForkJoinPool.commonPool-worker-1] - 第三阶段
22:52:09.394 c.TestThenAccept [ForkJoinPool.commonPool-worker-1] - 结果:6
22:52:09.394 c.TestThenAccept [main] - 返回结果:null

thenRun

  • 如果任务A执行完后,再执行B时,B不需要A的结果
  • 返回一个新的 CompletionStage,当此阶段正常完成时,执行给定的操作
public CompletionStage<Void> thenRun(Runnable action);

thenRun、thenAccept、thenApply 对比

  • thenRun:如果任务A执行完后,再执行B时,B不需要A的结果
  • thenAccept:如果任务A执行完后,再执行B时,B需要A的结果,并且无返回值
  • thenApply:如果任务A执行完后,再执行B时,B需要A的结果,并且有返回值

测试:

@Slf4j(topic = "c.TestThenRun_ThenAccept_ThenApply")
public class TestThenRun_ThenAccept_ThenApply {public static void main(String[] args) {CompletableFuture<Void> completableFuture1 = CompletableFuture.supplyAsync(() -> {return 1;}).thenRun(() -> {log.debug("做其他任务");});log.debug("thenRun的结果:" + completableFuture1.join());log.debug("===================");CompletableFuture<Void> completableFuture2 = CompletableFuture.supplyAsync(() -> {return 1;}).thenAccept((v) -> {log.debug("获取上一阶段的值:" + v);});log.debug("thenAccept的结果:" + completableFuture2.join());log.debug("===================");CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {return 1;}).thenApply((v) -> {log.debug("获取上一阶段的值:" + v);return v + 2;});log.debug("thenApply的结果:" + completableFuture3.join());}
}
22:58:08.055 c.TestThenRun_ThenAccept_ThenApply [main] - 做其他任务
22:58:08.059 c.TestThenRun_ThenAccept_ThenApply [main] - thenRun的结果:null
22:58:08.059 c.TestThenRun_ThenAccept_ThenApply [main] - ===================
22:58:08.060 c.TestThenRun_ThenAccept_ThenApply [main] - 获取上一阶段的值:1
22:58:08.061 c.TestThenRun_ThenAccept_ThenApply [main] - thenAccept的结果:null
22:58:08.061 c.TestThenRun_ThenAccept_ThenApply [main] - ===================
22:58:08.062 c.TestThenRun_ThenAccept_ThenApply [main] - 获取上一阶段的值:1
22:58:08.062 c.TestThenRun_ThenAccept_ThenApply [main] - thenApply的结果:3
注意:
由于各阶段任务做得很快,这时可能就会出现直接让main线程把任务做了,而没有开启新线程
如果在各阶段加一些休眠,可能就会开启新的线程做任务

给各阶段加休眠:

@Slf4j(topic = "c.TestThenRun_ThenAccept_ThenApply")
public class TestThenRun_ThenAccept_ThenApply {public static void main(String[] args) {CompletableFuture<Void> completableFuture1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 1;}).thenRun(() -> {log.debug("做其他任务");});log.debug("thenRun的结果:" + completableFuture1.join());log.debug("===================");CompletableFuture<Void> completableFuture2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 1;}).thenAccept((v) -> {log.debug("获取上一阶段的值:" + v);});log.debug("thenAccept的结果:" + completableFuture2.join());log.debug("===================");CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return 1;}).thenApply((v) -> {log.debug("获取上一阶段的值:" + v);return v + 2;});log.debug("thenApply的结果:" + completableFuture3.join());}
}
23:03:51.729 c.TestThenRun_ThenAccept_ThenApply [ForkJoinPool.commonPool-worker-1] - 做其他任务
23:03:51.732 c.TestThenRun_ThenAccept_ThenApply [main] - thenRun的结果:null
23:03:51.732 c.TestThenRun_ThenAccept_ThenApply [main] - ===================
23:03:53.734 c.TestThenRun_ThenAccept_ThenApply [ForkJoinPool.commonPool-worker-1] - 获取上一阶段的值:1
23:03:53.734 c.TestThenRun_ThenAccept_ThenApply [main] - thenAccept的结果:null
23:03:53.734 c.TestThenRun_ThenAccept_ThenApply [main] - ===================
23:03:56.735 c.TestThenRun_ThenAccept_ThenApply [ForkJoinPool.commonPool-worker-1] - 获取上一阶段的值:1
23:03:56.735 c.TestThenRun_ThenAccept_ThenApply [main] - thenApply的结果:3

thenRun 与 thenRunAsync 对比

  • thenRun 使用的线程池会和上一阶段的线程池相同,并且线程相同(虽然是2个阶段,但对于CompletableFuture来说,是一个任务,只需开启一个线程工作)
  • thenRunAsync 使用的线程池是默认的线程池 ForkJoinPool.commonPool(),并且CompletableFuture会把2个阶段的任务当做2个独立的任务(注意:这2个阶段的任务可能由一个线程完成,也可能由2个线程完成,取决于当时的cpu执行情况,所以在CompletableFuture 的很多场景中,可能会出现一个线程做多个阶段的任务)

thenAccept 与 thenAcceptAsync、thenApply 与 thenApplyAsync 同理

thenRunAsync 源码:

// CPU核数是否大于1
private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);
// CPU核数大于1,则使用默认线程池 ForkJoinPool.commonPool()
private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public CompletableFuture<Void> thenRunAsync(Runnable action) {// CPU核数大于1, asyncPool 使用默认线程池 ForkJoinPool.commonPool()return uniRunStage(asyncPool, action);
}

测试 thenRun:

@Slf4j(topic = "c.TestThenRun")
public class TestThenRun {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2);CompletableFuture<Void> completableFuture1 = CompletableFuture.supplyAsync(() -> {log.debug("不传入自定义线程池时,supplyAsync 的线程池:" + Thread.currentThread().getName());return 1;}).thenRun(() -> {log.debug("thenRun 和上一阶段使用相同的线程池:" + Thread.currentThread().getName());});log.debug("结果:" + completableFuture1.join());log.debug("======================");CompletableFuture<Void> completableFuture2 = CompletableFuture.supplyAsync(() -> {log.debug("传入自定义线程池时,supplyAsync 的线程池:" + Thread.currentThread().getName());return 1;// 传入自定义线程池}, pool).thenRun(() -> {log.debug("thenRun 和上一阶段使用相同的线程池:" + Thread.currentThread().getName());});log.debug("结果:" + completableFuture2.join());log.debug("======================");// 关闭线程池pool.shutdown();}
}
23:08:39.413 c.TestThenRun [ForkJoinPool.commonPool-worker-1] - 不传入自定义线程池时,supplyAsync 的线程池:ForkJoinPool.commonPool-worker-1
23:08:39.424 c.TestThenRun [ForkJoinPool.commonPool-worker-1] - thenRun 和上一阶段使用相同的线程池:ForkJoinPool.commonPool-worker-1
23:08:39.425 c.TestThenRun [main] - 结果:null
23:08:39.425 c.TestThenRun [main] - ======================
23:08:39.426 c.TestThenRun [pool-1-thread-1] - 传入自定义线程池时,supplyAsync 的线程池:pool-1-thread-1
23:08:39.426 c.TestThenRun [pool-1-thread-1] - thenRun 和上一阶段使用相同的线程池:pool-1-thread-1
23:08:39.426 c.TestThenRun [main] - 结果:null
23:08:39.426 c.TestThenRun [main] - ======================

测试 thenRunAsync:

@Slf4j(topic = "c.TestThenRunAsync")
public class TestThenRunAsync {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2);CompletableFuture<Void> completableFuture1 = CompletableFuture.supplyAsync(() -> {log.debug("不传入自定义线程池时,supplyAsync 的线程池:" + Thread.currentThread().getName());return 1;}).thenRunAsync(() -> {log.debug("thenRunAsync 使用默认的线程池:" + Thread.currentThread().getName());});log.debug("结果:" + completableFuture1.join());log.debug("======================");CompletableFuture<Void> completableFuture2 = CompletableFuture.supplyAsync(() -> {log.debug("传入自定义线程池时,supplyAsync 的线程池:" + Thread.currentThread().getName());return 1;// 传入自定义线程池}, pool).thenRunAsync(() -> {log.debug("thenRunAsync 使用默认的线程池:" + Thread.currentThread().getName());});log.debug("结果:" + completableFuture2.join());log.debug("======================");// 关闭线程池pool.shutdown();}
}
23:10:21.374 c.TestThenRunAsync [ForkJoinPool.commonPool-worker-1] - 不传入自定义线程池时,supplyAsync 的线程池:ForkJoinPool.commonPool-worker-1
23:10:21.377 c.TestThenRunAsync [ForkJoinPool.commonPool-worker-1] - thenRunAsync 使用默认的线程池:ForkJoinPool.commonPool-worker-1
23:10:21.377 c.TestThenRunAsync [main] - 结果:null
23:10:21.377 c.TestThenRunAsync [main] - ======================
23:10:21.378 c.TestThenRunAsync [pool-1-thread-1] - 传入自定义线程池时,supplyAsync 的线程池:pool-1-thread-1
23:10:21.378 c.TestThenRunAsync [ForkJoinPool.commonPool-worker-1] - thenRunAsync 使用默认的线程池:ForkJoinPool.commonPool-worker-1
23:10:21.378 c.TestThenRunAsync [main] - 结果:null
23:10:21.379 c.TestThenRunAsync [main] - ======================

(4) 对计算速度进行选用类型的方法

applyToEither

  • 返回最先完成任务的结果

applyToEither、acceptEither、runAfterEither 对比

  • applyToEither 得到最先完成任务的结果,有返回值
  • acceptEither 得到最先完成任务的结果,没有返回值
  • runAfterEither 不关心最先完成任务的结果,没有返回值
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);

测试 applyToEither:

@Slf4j(topic = "c.TestApplyToEither")
public class TestApplyToEither {public static void main(String[] args) {CompletableFuture<String> A = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}log.debug("A 1s 完成任务");return "A";});CompletableFuture<String> B = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}log.debug("B 2s 完成任务");return "B";});CompletableFuture<String> result = A.applyToEither(B, v -> v + " 先完成");log.debug(result.join());}
}
23:15:50.711 c.TestApplyToEither [ForkJoinPool.commonPool-worker-1] - A 1s 完成任务
23:15:50.714 c.TestApplyToEither [main] - A 先完成

(5) 对计算结果进行合并类型的方法

thenCombine

  • 合并2个阶段正常完成的结果
  • 先完成的先等待

thenCombine、thenAcceptBoth、runAfterBoth 对比

  • thenCombine 合并前2个阶段正常完成的结果,有返回值
  • thenAcceptBoth 合并前2个阶段正常完成的结果,没有返回值
  • runAfterBoth 不关心前2个阶段正常完成的结果,没有返回值
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,  BiFunction<? super T,? super U,? extends V> fn);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);

测试 thenCombine:

  • 第一种写法
@Slf4j(topic = "c.TestThenCombine")
public class TestThenCombine {public static void main(String[] args) {log.debug("开始");CompletableFuture<Integer> A = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}log.debug("A 完成");return 10;});CompletableFuture<Integer> B = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}log.debug("B 完成");return 20;});CompletableFuture<Integer> result = A.thenCombine(B, (x, y) -> x + y);log.debug("结果:" + result.join());}
}
16:47:50.931 c.TestThenCombine [main] - 开始
16:47:51.992 c.TestThenCombine [ForkJoinPool.commonPool-worker-9] - A 完成
16:47:53.007 c.TestThenCombine [ForkJoinPool.commonPool-worker-2] - B 完成
16:47:53.007 c.TestThenCombine [main] - 结果:30
  • 第二种写法
@Slf4j(topic = "c.TestThenCombine")
public class TestThenCombine {public static void main(String[] args) {log.debug("开始");CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}log.debug("A 完成");return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}log.debug("B 完成");return 20;}), (x, y) -> x + y);log.debug("结果:" + result.join());}
}
16:53:50.817 c.TestThenCombine [main] - 开始
16:53:51.894 c.TestThenCombine [ForkJoinPool.commonPool-worker-9] - A 完成
16:53:52.886 c.TestThenCombine [ForkJoinPool.commonPool-worker-2] - B 完成
16:53:52.886 c.TestThenCombine [main] - 结果:30

5. CompletableFuture 性能

分析 CompletableFuture 完成异步任务

  • 任务数 等于 CompletableFuture 线程池的最大线程数(CPU核心数 - 1) 时,CompletableFuture 性能最佳
    例如:有4个核心,则 CompletableFuture 线程池的最大线程数是3,此时任务数为3(假设每个任务耗时1s),CompletableFuture 性能最 佳(CompletableFuture 做完3个任务耗时也就1s左右);
    如果任务数为4-6,则 CompletableFuture 做完4-6个任务耗时就会是2s左右;
    如果任务数为7-9,则 CompletableFuture 做完7-9个任务耗时就会是3s左右;
    以此类推
  • 所以当已知任务数量时,可以创建自定义线程池,手动设置线程池最大线程数
  • 如果需要根据任务数量动态设置线程池最大线程数时,每执行完一批任务后,需要关闭线程池,等下一次新任务来时再根据任务数重新创建线程池
    例如:第一次来了100个任务,则创建10个线程的线程池,执行完100个任务后关闭线程池;第二次来了200个任务时,再重新创建有20个线程的线程池;

测试 任务数 等于 CompletableFuture 线程池的最大线程数:

创建任务类:

@Slf4j(topic = "c.Task")
class Task {// 任务名private String name;// 做该任务耗时 (秒)private Integer productionTime;public Task(String name, Integer productionTime) {this.name = name;this.productionTime = productionTime;}// 做任务public void doTask() {try {TimeUnit.SECONDS.sleep(this.productionTime);} catch (InterruptedException e) {e.printStackTrace();}log.debug(this.name + " 已做完");}
}

第一种写法:

@Slf4j(topic = "c.TestCompletableFuture_doTask")
public class TestCompletableFuture_doTask {public static void main(String[] args) {log.debug("该二手台式机可用处理器数量:{}", Runtime.getRuntime().availableProcessors());log.debug("当前线程池中的最大线程数:{}", ForkJoinPool.getCommonPoolParallelism());log.debug("开始");long startTime = System.currentTimeMillis();// 创建任务List<Task> tasks = new ArrayList<>();for (int i = 1; i <= 3; i++) {Task task = new Task("任务" + i, 1);tasks.add(task);}// 做任务List<CompletableFuture> completableFutureList = new ArrayList<>();for (Task task : tasks) {CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> task.doTask());// 将所有任务加到 CompletableFuture 集合中completableFutureList.add(cf);}// 等待 CompletableFuture 集合中的所有任务执行完毕CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).join();log.debug("做完所有任务的耗时:{}", (System.currentTimeMillis() - startTime));}
}
00:50:41.392 c.TestCompletableFuture_doTask [main] - 该二手台式机可用处理器数量:4
00:50:41.392 c.TestCompletableFuture_doTask [main] - 当前线程池中的最大线程数:3
00:50:41.397 c.TestCompletableFuture_doTask [main] - 开始
00:50:42.467 c.Task [ForkJoinPool.commonPool-worker-3] - 任务3 已做完
00:50:42.467 c.Task [ForkJoinPool.commonPool-worker-2] - 任务2 已做完
00:50:42.467 c.Task [ForkJoinPool.commonPool-worker-1] - 任务1 已做完
00:50:42.467 c.TestCompletableFuture_doTask [main] - 做完所有任务的耗时:1069

第二种写法(用Stream简化代码,结果还是一样,主要是简化代码):

@Slf4j(topic = "c.TestCompletableFuture_doTask")
public class TestCompletableFuture_doTask {public static void main(String[] args) {log.debug("该二手台式机可用处理器数量:{}", Runtime.getRuntime().availableProcessors());log.debug("当前线程池中的最大线程数:{}", ForkJoinPool.getCommonPoolParallelism());log.debug("开始");long startTime = System.currentTimeMillis();// 将创建的任务加到 CompletableFuture 数组中 并执行CompletableFuture[] tasks= IntStream.rangeClosed(1, 3).mapToObj(i -> new Task("任务" + i, 1)).map(task -> CompletableFuture.runAsync(task::doTask)).toArray(CompletableFuture[]::new);// 等待 CompletableFuture 数组中的所有任务执行完CompletableFuture.allOf(tasks).join();log.debug("做完所有任务的耗时:{}", (System.currentTimeMillis() - startTime));}
}
00:51:13.655 c.TestCompletableFuture_doTask [main] - 该二手台式机可用处理器数量:4
00:51:13.655 c.TestCompletableFuture_doTask [main] - 当前线程池中的最大线程数:3
00:51:13.667 c.TestCompletableFuture_doTask [main] - 开始
00:51:14.896 c.Task [ForkJoinPool.commonPool-worker-2] - 任务2 已做完
00:51:14.896 c.Task [ForkJoinPool.commonPool-worker-1] - 任务1 已做完
00:51:14.896 c.Task [ForkJoinPool.commonPool-worker-3] - 任务3 已做完
00:51:14.896 c.TestCompletableFuture_doTask [main] - 做完所有任务的耗时:1229

测试 任务数 超过 CompletableFuture 线程池的最大线程数:

@Slf4j(topic = "c.TestCompletableFuture_doTask")
public class TestCompletableFuture_doTask {public static void main(String[] args) {log.debug("该二手台式机可用处理器数量:{}", Runtime.getRuntime().availableProcessors());log.debug("当前线程池中的最大线程数:{}", ForkJoinPool.getCommonPoolParallelism());log.debug("开始");long startTime = System.currentTimeMillis();// 将创建的任务加到 CompletableFuture 数组中 并执行CompletableFuture[] tasks= IntStream.rangeClosed(1, 3).mapToObj(i -> new Task("任务" + i, 1)).map(task -> CompletableFuture.runAsync(task::doTask)).toArray(CompletableFuture[]::new);// 等待 CompletableFuture 数组中的所有任务执行完CompletableFuture.allOf(tasks).join();log.debug("做完所有任务的耗时:{}", (System.currentTimeMillis() - startTime));}
}
01:06:31.516 c.TestCompletableFuture_doTask [main] - 该二手台式机可用处理器数量:4
01:06:31.516 c.TestCompletableFuture_doTask [main] - 当前线程池中的最大线程数:3
01:06:31.527 c.TestCompletableFuture_doTask [main] - 开始
01:06:32.636 c.Task [ForkJoinPool.commonPool-worker-2] - 任务2 已做完
01:06:32.636 c.Task [ForkJoinPool.commonPool-worker-1] - 任务1 已做完
01:06:32.637 c.Task [ForkJoinPool.commonPool-worker-3] - 任务3 已做完
01:06:33.637 c.Task [ForkJoinPool.commonPool-worker-2] - 任务4 已做完
01:06:33.637 c.TestCompletableFuture_doTask [main] - 做完所有任务的耗时:2110

CompletableFuture 配合 自定义线程池 使用

@Slf4j(topic = "c.TestCompletableFuture_doTask")
public class TestCompletableFuture_doTask {public static void main(String[] args) {ExecutorService pool = Executors.newCachedThreadPool();log.debug("开始");long startTime = System.currentTimeMillis();// 将创建的任务加到 CompletableFuture 数组中 并执行CompletableFuture[] tasks= IntStream.rangeClosed(1, 3).mapToObj(i -> new Task("任务" + i, 1)).map(task -> CompletableFuture.runAsync(task::doTask, pool)) // 传入自定义线程池.toArray(CompletableFuture[]::new);// 等待 CompletableFuture 数组中的所有任务执行完CompletableFuture.allOf(tasks).join();log.debug("做完所有任务的耗时:{}", (System.currentTimeMillis() - startTime));pool.shutdown();}
}
01:32:25.794 c.TestCompletableFuture_doTask [main] - 开始
01:32:26.871 c.Task [pool-1-thread-10] - 任务10 已做完
01:32:26.872 c.Task [pool-1-thread-9] - 任务9 已做完
01:32:26.872 c.Task [pool-1-thread-8] - 任务8 已做完
01:32:26.872 c.Task [pool-1-thread-7] - 任务7 已做完
01:32:26.872 c.Task [pool-1-thread-6] - 任务6 已做完
01:32:26.873 c.Task [pool-1-thread-5] - 任务5 已做完
01:32:26.873 c.Task [pool-1-thread-4] - 任务4 已做完
01:32:26.873 c.Task [pool-1-thread-3] - 任务3 已做完
01:32:26.874 c.Task [pool-1-thread-2] - 任务2 已做完
01:32:26.874 c.Task [pool-1-thread-1] - 任务1 已做完
01:32:26.874 c.TestCompletableFuture_doTask [main] - 做完所有任务的耗时:1080
可以看到就算做10个任务,耗时也是1s左右,但不是任何数量的任务都只耗时1s
一台计算机,一瞬间处理的最大任务数量跟CPU的处理能力有关,也跟内存、硬盘、网卡等设备有关
具体跟什么有关,取决于任务本身所需要的资源
由于测试案例中的任务使用sleep模拟耗时的,但是sleep这个操作不太占CPU资源,
所以当传入自定义线程池后,任务数超过了CPU核心数后,耗时也不多

http://chatgpt.dhexx.cn/article/LshAwLCT.shtml

相关文章

Java 编程问题:十一、并发-深入探索

原文&#xff1a;Java Coding Problems 协议&#xff1a;CC BY-NC-SA 4.0 贡献者&#xff1a;飞龙 本文来自【ApacheCN Java 译文集】&#xff0c;自豪地采用谷歌翻译。 本章包括涉及 Java 并发的 13 个问题&#xff0c;涉及 Fork/Join 框架、CompletableFuture、ReentrantLock…

线程(十二)---CompletableFuture(三)

写在前面&#xff1a;各位看到此博客的小伙伴&#xff0c;如有不对的地方请及时通过私信我或者评论此博客的方式指出&#xff0c;以免误人子弟。多谢&#xff01; 示例五&#xff1a;异常处理 接着上一篇记录一下CompletableFuture的异常处理&#xff0c;异常处理通常使用…

dice loss

Dice Loss 最先是在VNet 这篇文章中被提出&#xff0c;后来被广泛的应用在了医学影像分割之中。 Dice 系数 Dice系数作为损失函数的原因和混淆矩阵有着很大的关系&#xff0c;下图给出的是一个混淆矩阵&#xff1a; 其中的一些关键指标如下&#xff1a; 精确率(precision)表…

Hinge loss

声明&#xff1a; 参考自维基百科后面可能会更新 Hinge Loss 在机器学习中&#xff0c;hinge loss作为一个损失函数(loss function)&#xff0c;通常被用于最大间隔算法(maximum-margin)&#xff0c;而最大间隔算法又是SVM(支持向量机support vector machines)用到的重要算法…

【深度学习】一文读懂机器学习常用损失函数(Loss Function)

【深度学习】一文读懂机器学习常用损失函数&#xff08;Loss Function&#xff09; 最近太忙已经好久没有写博客了&#xff0c;今天整理分享一篇关于损失函数的文章吧&#xff0c;以前对损失函数的理解不够深入&#xff0c;没有真正理解每个损失函数的特点以及应用范围&#x…

Pytorch之loss(损失函数)

损失函数也在torch.nn下&#xff0c;具体可以参考文档&#xff0c;也可以参考官网 先根据L1Loss举例 我个人感觉这里的描述还是到官网的文档找比较好&#xff0c;公式看的比文档清楚 import torch from torch import nninputs torch.tensor([[3, 2, 1],[1, 2, 3]], dtypetorch…

机器学习 损失函数 Loss function

损失函数 最小二乘法极大似然估计法交叉熵 【本文根据B站-王木头学科学-视频所学】 在梯度下降中&#xff0c;所求的梯度其实就是损失函数的梯度。 损失函数有三种设计方法&#xff1a; &#xff08;1&#xff09;最小二乘法 &#xff08;2&#xff09;极大似然估计法 &#x…

Focal loss 损失函数详解

Focal loss 目前目标检测的算法大致分为两类&#xff0c;One Stage 、Two Stage。 One Stage&#xff1a;主要指类似YOLO、SGD等这样不需要region proposal,直接回归的检测算法&#xff0c;这类算法检测速度很快&#xff0c;但是精度准确率不如使用Two stage的模型。 two St…

机器学习之常见的损失函数(loss function)

解决一个机器学习问题主要有两部分&#xff1a;数据和算法。而算法又有三个部分组成&#xff1a;假设函数、损失函数、算法优化。我们一般在看算法书或者视频教学时&#xff0c;更多的是去推算或者说参数估计出其假设函数&#xff0c;而往往不太注重损失函数&#xff0c;但是损…

深度学习loss函数理解

机器学习中的范数规则化之L0、L1、L2范数及loss函数 监督机器学习问题无非就是“minimizeyour error while regularizing your parameters”&#xff0c;也就是在规则化参数的同时最小化误差。 最小化误差是为了让我们的模型拟合我们的训练数据&#xff0c;而规则化参数是防止我…

CE Loss,BCE Loss以及Focal Loss的原理理解

一、交叉熵损失函数&#xff08;CE Loss&#xff0c;BCE Loss&#xff09; 最开始理解交叉熵损失函数被自己搞的晕头转向的&#xff0c;最后发现是对随机变量的理解有偏差&#xff0c;不知道有没有读者和我有着一样的困惑&#xff0c;所以在本文开始之前&#xff0c;先介绍一下…

损失函数loss

http://blog.csdn.net/pipisorry/article/details/23538535 监督学习及其目标函数 损失函数&#xff08;loss function&#xff09;是用来估量你模型的预测值f(x)与真实值Y的不一致程度&#xff0c;它是一个非负实值函数&#xff0c;通常使用L(Y, f(x))来表示。 损失函数是经…

机器学习模型中的损失函数loss function

1. 概述 在机器学习算法中&#xff0c;有一个重要的概念就是损失函数&#xff08;Loss Function&#xff09;。损失函数的作用就是度量模型的预测值 f ( x ) f\left ( \mathbf{x} \right ) f(x)与真实值 y \mathbf{y} y之间的差异程度的函数&#xff0c;且是一个非负实值函数。…

损失函数(Loss)

如果我们定义了一个机器学习模型&#xff0c;比如一个三层的神经网络&#xff0c;那么就需要使得这个模型能够尽可能拟合所提供的训练数据。但是我们如何评价模型对于数据的拟合是否足够呢&#xff1f;那就需要使用相应的指标来评价它的拟合程度&#xff0c;所使用到的函数就称…

focal loss详解

文章目录 focal loss的整体理解易分辨样本、难分辨样本的含义focal loss的出现过程focal loss 举例说明focal loss的 α \alpha α变体 focal loss的整体理解 focal loss 是一种处理样本分类不均衡的损失函数&#xff0c;它侧重的点是根据样本分辨的难易程度给样本对应的损失添…

深度学习——损失函数(Regression Loss、Classification Loss)

简介 Loss function 损失函数 用于定义单个训练样本与真实值之间的误差 Cost function 代价函数 用于定义单个批次/整个训练集样本与真实值之间的误差 Objective function 目标函数 泛指任意可以被优化的函数 损失函数用于衡量模型所做出的预测离真实值(GT)之间的偏离程度。 …

深度学习中常见的损失函数(L1Loss、L2loss)

损失函数定义 损失函数&#xff1a;衡量模型输出与真实标签的差异。 L1_loss 平均绝对误差&#xff08;L1 Loss&#xff09;:平均绝对误差&#xff08;Mean Absolute Error,MAE&#xff09;是指模型预测值f(x)和真实值y之间距离的平均值&#xff0c;公式如下&#xff1a; 优…

损失函数(loss function)

文章目录 1、什么是损失函数2、为什么要使用损失函数3、损失函数分类1、分类一2、分类二3、分类三3.1基于距离度量的损失函数3.1.1 均方误差损失函数&#xff08;MSE&#xff09;3.1.2 L2损失函数3.1.3 L1损失函数3.1.4 Smooth L1损失函数3.1.5 huber损失函数 3.2 基于概率分布…

Focal Loss损失函数(超级详细的解读)

什么是损失函数&#xff1f; 1、什么是损失呢&#xff1f; 在机器学习模型中&#xff0c;对于每一个样本的预测值与真实值的差称为损失。 2、什么是损失函数呢&#xff1f; 显而易见&#xff0c;是一个用来计算损失的函数。它是一个非负实值函数,通常使用L(Y, f(x))来表示。 3、…

损失函数loss大总结

分类任务loss: 二分类交叉熵损失sigmoid_cross_entropy&#xff1a; TensorFlow 接口&#xff1a; tf.losses.sigmoid_cross_entropy(multi_class_labels,logits,weights1.0,label_smoothing0,scopeNone,loss_collectiontf.GraphKeys.LOSSES,reductionReduction.SUM_BY_NONZER…