如何使用 CompletableFuture 实现异步任务链并处理异常?

在 Java 8 之后,CompletableFuture 为我们提供了一种强大且灵活的方式来编写异步、非阻塞代码。它不仅支持单个任务的异步执行,还能让多个任务串联、并行、组合,并且提供了丰富的异常处理机制。下面我们通过一个完整的示例,演示如何构建异步任务链,并在任何阶段捕获并处理异常。


1. 业务场景

假设我们正在开发一个电商订单系统,流程如下:

  1. 获取订单信息(异步 I/O)
  2. 检查库存(异步网络请求)
  3. 扣减库存(异步数据库写入)
  4. 支付(异步第三方支付)
  5. 发送邮件通知(异步邮件服务)

如果任何一步失败,都需要回滚或给用户友好提示。我们用 CompletableFuture 来实现。


2. 基础代码

import java.util.concurrent.*;
import java.util.function.*;

public class OrderProcessor {

    // 模拟线程池
    private static final ExecutorService executor = Executors.newFixedThreadPool(8);

    // 1. 获取订单
    private static CompletableFuture <Order> fetchOrderAsync(String orderId) {
        return CompletableFuture.supplyAsync(() -> {
            simulateDelay(100);
            if (Math.random() < 0.1) throw new RuntimeException("订单获取失败");
            return new Order(orderId, 42);
        }, executor);
    }

    // 2. 检查库存
    private static CompletableFuture <Boolean> checkStockAsync(int productId) {
        return CompletableFuture.supplyAsync(() -> {
            simulateDelay(120);
            if (Math.random() < 0.2) throw new RuntimeException("库存检查失败");
            return true;
        }, executor);
    }

    // 3. 扣减库存
    private static CompletableFuture <Void> deductStockAsync(int productId) {
        return CompletableFuture.runAsync(() -> {
            simulateDelay(80);
            if (Math.random() < 0.15) throw new RuntimeException("扣减库存失败");
        }, executor);
    }

    // 4. 付款
    private static CompletableFuture <Boolean> paymentAsync(double amount) {
        return CompletableFuture.supplyAsync(() -> {
            simulateDelay(200);
            if (Math.random() < 0.25) throw new RuntimeException("支付失败");
            return true;
        }, executor);
    }

    // 5. 发送邮件
    private static CompletableFuture <Void> sendEmailAsync(String email) {
        return CompletableFuture.runAsync(() -> {
            simulateDelay(50);
            if (Math.random() < 0.05) throw new RuntimeException("邮件发送失败");
        }, executor);
    }

    // 工具:模拟延迟
    private static void simulateDelay(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
    }

    // 业务流程
    public static CompletableFuture <Void> processOrder(String orderId, String email) {
        return fetchOrderAsync(orderId)
            .thenCompose(order -> checkStockAsync(order.productId)
                    .thenCompose(ignored -> deductStockAsync(order.productId)))
            .thenCompose(ignored -> paymentAsync(99.99))
            .thenCompose(success -> {
                if (!success) return CompletableFuture.failedFuture(new RuntimeException("支付未成功"));
                return sendEmailAsync(email);
            })
            .thenAccept(v -> System.out.println("订单处理成功,已发送邮件给 " + email))
            .exceptionally(ex -> {
                System.err.println("订单处理失败: " + ex.getMessage());
                // 这里可以触发补偿操作,如回滚库存等
                return null;
            });
    }

    // 简单的 Order 类
    private static class Order {
        String id;
        int productId;
        Order(String id, int productId) {
            this.id = id; this.productId = productId;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CompletableFuture <Void> future = processOrder("ORD123", "[email protected]");
        future.join(); // 等待完成
        executor.shutdown();
    }
}

3. 关键点剖析

  1. 链式调用
    thenCompose 用于连接需要前一步结果的异步任务。它接受一个 Function,返回另一个 CompletableFuture,两者会串联起来形成完整的执行链。

  2. 错误传播
    任何一步抛出的异常都会导致整个链被标记为异常完成。后续 thenComposethenAccept 将被跳过,直接走异常处理路径。

  3. 统一异常处理
    exceptionally 让我们可以统一捕获并处理所有异常。它也可以用来执行补偿逻辑,如事务回滚、日志记录或发送告警。

  4. 非阻塞等待
    main 方法里使用 future.join() 等待完成,但这并不阻塞整个线程池;如果想让主线程继续执行其他任务,可以不调用 join(),或使用 future.whenComplete(...)

  5. 线程池管理
    这里使用自定义 ExecutorService,在生产环境建议使用业务相关的线程池,并在应用关闭时进行优雅 shutdown。


4. 进阶使用

  • 并行组合
    CompletableFuture.allOf(future1, future2, ...)anyOf(...) 用于并行等待多个任务完成。

  • 自定义线程池
    为了避免 ForkJoinPool.commonPool() 产生的性能波动,建议使用 CompletableFuture.supplyAsync(fn, executor) 指定线程池。

  • 超时控制
    orTimeout(Java 9+)或自定义 CompletableFuture.completeOnTimeout 可为任务设定超时。

  • 组合异常信息
    当多个并行任务都失败时,可使用 CompletionException 包装异常,或手动收集异常。


5. 小结

CompletableFuture 让异步编程变得极其直观,几行代码即可实现复杂的异步工作流。通过链式调用、统一异常处理和自定义线程池,既能保持代码可读性,又能在高并发场景下保持系统稳定。希望这份示例能帮助你快速上手并在实际项目中灵活运用。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注