Java 中 CompletableFuture 的并发处理技巧

在 Java 8 以后,CompletableFuture 成为异步编程的重要工具。它通过链式调用、组合和错误处理,让我们能够以声明式方式构建并发任务。本文将从常见场景出发,介绍几个实用技巧,帮助你更高效地使用 CompletableFuture。

1. 基本使用回顾

CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    try { Thread.sleep(1000); } catch (InterruptedException e) {}
    return "Result";
}).thenApply(result -> {
    // 对结果做处理
    return result.toUpperCase();
}).thenAccept(finalResult -> {
    System.out.println("最终结果:" + finalResult);
});

上述代码演示了从异步计算到结果处理的完整流程。supplyAsync 使用默认 ForkJoinPool.commonPool(),如果任务多,建议自定义线程池。

2. 组合多个 CompletableFuture

2.1 allOf

当你需要等待多个异步任务全部完成后再执行后续逻辑时,可使用 allOf

CompletableFuture <Integer> f1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture <Integer> f2 = CompletableFuture.supplyAsync(() -> 20);

CompletableFuture <Void> all = CompletableFuture.allOf(f1, f2);

all.thenRun(() -> {
    int sum = f1.join() + f2.join();
    System.out.println("总和:" + sum);
});

2.2 anyOf

若只需要任一任务完成即可继续,可用 anyOf

CompletableFuture <String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture <String> f2 = CompletableFuture.supplyAsync(() -> "B");

CompletableFuture <Object> any = CompletableFuture.anyOf(f1, f2);

any.thenAccept(result -> System.out.println("第一个完成:" + result));

3. 错误处理技巧

异常会导致后续链条停止,除非显式捕获。常用方式:

CompletableFuture <Integer> risky = CompletableFuture.supplyAsync(() -> {
    if (new Random().nextBoolean()) throw new RuntimeException("偶发错误");
    return 42;
});

risky.exceptionally(ex -> {
    System.err.println("捕获异常:" + ex.getMessage());
    return -1; // 提供回退值
}).thenAccept(result -> System.out.println("最终值:" + result));

exceptionally 可以在异常后恢复执行;如果不想在异常链上继续执行,可以在 exceptionally 内部直接返回 null 或其它默认值。

4. 并行流 + CompletableFuture 的协作

Java Stream API 本身已支持并行,但若想在每个元素上执行异步任务,可结合 CompletableFuture:

List <String> items = Arrays.asList("a", "b", "c");
List<CompletableFuture<String>> futures = items.stream()
    .map(item -> CompletableFuture.supplyAsync(() -> item.toUpperCase()))
    .collect(Collectors.toList());

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
    .thenApply(v -> futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList()))
    .thenAccept(resultList -> System.out.println(resultList));

这样可以避免传统 parallelStream() 的线程泄漏问题,且可灵活控制线程池。

5. 自定义线程池的必要性

使用默认的 ForkJoinPool.commonPool() 对于 CPU 密集型任务不友好,尤其当任务数目多时会导致线程抢占。推荐自定义:

ExecutorService customPool = Executors.newFixedThreadPool(8);

CompletableFuture.supplyAsync(() -> {
    // long running IO
    return "IO 完成";
}, customPool);

记得在应用结束时关闭线程池:customPool.shutdown();

6. 常见陷阱

  1. 使用 join() 阻塞
    join() 会在当前线程等待结果,等价于 get()。若在主线程使用,会失去异步优势。只在需要同步取结果时使用。

  2. 异常未捕获导致 CompletionException
    任何异常都会被包装为 CompletionException。在处理时注意拆箱。

  3. 链式错误漏处理
    handle 可以同时处理结果和异常,避免链条被中断。

7. 小结

  • CompletableFuture 通过链式调用实现声明式异步编程。
  • allOf / anyOf 用于组合多个任务。
  • exceptionallyhandle 等方法可优雅处理异常。
  • 自定义线程池可避免资源争用。
  • 与 Stream 结合可实现并行异步流处理。

掌握上述技巧后,你可以在 Java 项目中轻松构建高效、可维护的异步逻辑,提升程序的并发性能与响应速度。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注