Java 中的 CompletableFuture 如何实现异步编程?

在 Java 8 之后,CompletableFuture 为异步编程提供了极其强大且易用的 API。它不仅可以替代传统的 Future,还支持函数式编程风格,允许你链式调用、组合任务、处理异常,甚至实现回调式异步流程。下面从基础到进阶,系统地介绍 CompletableFuture 的使用方法与典型场景。

1. 基础使用

1.1 创建 CompletableFuture

CompletableFuture <String> cf = CompletableFuture.supplyAsync(() -> {
    // 这里放耗时操作,例如网络请求
    return "Hello, CompletableFuture!";
});
  • supplyAsync:返回值为 T 的异步任务,默认使用 ForkJoinPool.commonPool()。
  • 如果你想指定自定义线程池:
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture <String> cf = CompletableFuture.supplyAsync(() -> "Task done", executor);

1.2 处理结果

cf.thenAccept(result -> System.out.println("Result: " + result));

thenAccept 会在 cf 完成后异步执行回调。thenApply 则可用于转换结果。

1.3 处理异常

cf.exceptionally(ex -> {
    System.err.println("Error: " + ex.getMessage());
    return "Fallback";
});

2. 组合异步任务

2.1 并行执行(All)

CompletableFuture <Integer> f1 = CompletableFuture.supplyAsync(() -> {
    // 任务 A
    return 1;
});
CompletableFuture <Integer> f2 = CompletableFuture.supplyAsync(() -> {
    // 任务 B
    return 2;
});

CompletableFuture <Void> allDone = CompletableFuture.allOf(f1, f2);
allDone.thenRun(() -> {
    try {
        System.out.println("Sum: " + (f1.get() + f2.get()));
    } catch (Exception e) {
        e.printStackTrace();
    }
});

2.2 只等待第一个完成(Any)

CompletableFuture <Integer> firstDone = CompletableFuture.anyOf(f1, f2)
    .thenApply(obj -> (Integer) obj);
firstDone.thenAccept(result -> System.out.println("First result: " + result));

3. 复杂流程:链式调用

CompletableFuture <String> pipeline = CompletableFuture.supplyAsync(() -> "Start")
    .thenApply(str -> str + " -> Step1")
    .thenCompose(str -> CompletableFuture.supplyAsync(() -> str + " -> Step2"))
    .thenAccept(result -> System.out.println("Pipeline finished: " + result));
  • thenCompose 用于链式调用返回 CompletableFuture 的方法,避免嵌套 CompletableFuture<CompletableFuture<T>>

4. 实战案例:多服务调用

假设你需要调用 A、B、C 三个微服务,并在它们全部完成后生成最终报表。

CompletableFuture <String> a = CompletableFuture.supplyAsync(() -> callServiceA());
CompletableFuture <String> b = CompletableFuture.supplyAsync(() -> callServiceB());
CompletableFuture <String> c = CompletableFuture.supplyAsync(() -> callServiceC());

CompletableFuture <Void> all = CompletableFuture.allOf(a, b, c);

CompletableFuture <String> report = all.thenApply(v -> {
    try {
        return generateReport(a.get(), b.get(), c.get());
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
});

report.thenAccept(r -> System.out.println("Report: " + r));

5. 性能与注意事项

  1. 线程池CompletableFuture 默认使用 ForkJoinPool.commonPool()。如果任务是 I/O 密集型或需要大量并发,建议使用自定义 Executor
  2. 异常处理exceptionally 只能捕获到单个异常,若链式任务中多个异常并发发生,最好使用 handlewhenComplete
  3. 取消任务CompletableFuture 提供 cancel(true/false),但要注意后续链式调用的行为。
  4. 与 Reactive:如果你已经在使用 Reactor、RxJava,CompletableFuture 可以通过 CompletableFuture.toCompletableFuture() 进行桥接。

6. 小技巧

  • 转换为 Futurecf.toCompletableFuture(),但更常见的是直接使用 CompletableFuture
  • 线程安全CompletableFuture 内部使用 AtomicReference 保证线程安全,但回调中的业务代码需自行考虑同步。
  • 打印堆栈cf.whenComplete((res, ex) -> { if (ex != null) ex.printStackTrace(); }) 方便调试。

7. 结语

CompletableFuture 让 Java 的异步编程变得直观、可组合。它既能满足简单的异步需求,也能处理复杂的并行与组合场景。只要合理使用线程池、异常处理和回调链式调用,就能大幅提升代码可读性和并发性能。快把它试用在你的项目中,感受 Java 8+ 的异步魅力吧!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注