在 Java 8 之后,CompletableFuture 为我们提供了一种强大且灵活的方式来编写异步、非阻塞代码。它不仅支持单个任务的异步执行,还能让多个任务串联、并行、组合,并且提供了丰富的异常处理机制。下面我们通过一个完整的示例,演示如何构建异步任务链,并在任何阶段捕获并处理异常。
1. 业务场景
假设我们正在开发一个电商订单系统,流程如下:
- 获取订单信息(异步 I/O)
- 检查库存(异步网络请求)
- 扣减库存(异步数据库写入)
- 支付(异步第三方支付)
- 发送邮件通知(异步邮件服务)
如果任何一步失败,都需要回滚或给用户友好提示。我们用 CompletableFuture 来实现。
2. 基础代码
import java.util.concurrent.*;
import java.util.function.*;
public class OrderProcessor {
// 模拟线程池
private static final ExecutorService executor = Executors.newFixedThreadPool(8);
// 1. 获取订单
private static CompletableFuture <Order> fetchOrderAsync(String orderId) {
return CompletableFuture.supplyAsync(() -> {
simulateDelay(100);
if (Math.random() < 0.1) throw new RuntimeException("订单获取失败");
return new Order(orderId, 42);
}, executor);
}
// 2. 检查库存
private static CompletableFuture <Boolean> checkStockAsync(int productId) {
return CompletableFuture.supplyAsync(() -> {
simulateDelay(120);
if (Math.random() < 0.2) throw new RuntimeException("库存检查失败");
return true;
}, executor);
}
// 3. 扣减库存
private static CompletableFuture <Void> deductStockAsync(int productId) {
return CompletableFuture.runAsync(() -> {
simulateDelay(80);
if (Math.random() < 0.15) throw new RuntimeException("扣减库存失败");
}, executor);
}
// 4. 付款
private static CompletableFuture <Boolean> paymentAsync(double amount) {
return CompletableFuture.supplyAsync(() -> {
simulateDelay(200);
if (Math.random() < 0.25) throw new RuntimeException("支付失败");
return true;
}, executor);
}
// 5. 发送邮件
private static CompletableFuture <Void> sendEmailAsync(String email) {
return CompletableFuture.runAsync(() -> {
simulateDelay(50);
if (Math.random() < 0.05) throw new RuntimeException("邮件发送失败");
}, executor);
}
// 工具:模拟延迟
private static void simulateDelay(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
}
// 业务流程
public static CompletableFuture <Void> processOrder(String orderId, String email) {
return fetchOrderAsync(orderId)
.thenCompose(order -> checkStockAsync(order.productId)
.thenCompose(ignored -> deductStockAsync(order.productId)))
.thenCompose(ignored -> paymentAsync(99.99))
.thenCompose(success -> {
if (!success) return CompletableFuture.failedFuture(new RuntimeException("支付未成功"));
return sendEmailAsync(email);
})
.thenAccept(v -> System.out.println("订单处理成功,已发送邮件给 " + email))
.exceptionally(ex -> {
System.err.println("订单处理失败: " + ex.getMessage());
// 这里可以触发补偿操作,如回滚库存等
return null;
});
}
// 简单的 Order 类
private static class Order {
String id;
int productId;
Order(String id, int productId) {
this.id = id; this.productId = productId;
}
}
public static void main(String[] args) throws InterruptedException {
CompletableFuture <Void> future = processOrder("ORD123", "[email protected]");
future.join(); // 等待完成
executor.shutdown();
}
}
3. 关键点剖析
-
链式调用
thenCompose用于连接需要前一步结果的异步任务。它接受一个Function,返回另一个CompletableFuture,两者会串联起来形成完整的执行链。 -
错误传播
任何一步抛出的异常都会导致整个链被标记为异常完成。后续thenCompose或thenAccept将被跳过,直接走异常处理路径。 -
统一异常处理
exceptionally让我们可以统一捕获并处理所有异常。它也可以用来执行补偿逻辑,如事务回滚、日志记录或发送告警。 -
非阻塞等待
在main方法里使用future.join()等待完成,但这并不阻塞整个线程池;如果想让主线程继续执行其他任务,可以不调用join(),或使用future.whenComplete(...)。 -
线程池管理
这里使用自定义ExecutorService,在生产环境建议使用业务相关的线程池,并在应用关闭时进行优雅 shutdown。
4. 进阶使用
-
并行组合
CompletableFuture.allOf(future1, future2, ...)或anyOf(...)用于并行等待多个任务完成。 -
自定义线程池
为了避免ForkJoinPool.commonPool()产生的性能波动,建议使用CompletableFuture.supplyAsync(fn, executor)指定线程池。 -
超时控制
orTimeout(Java 9+)或自定义CompletableFuture.completeOnTimeout可为任务设定超时。 -
组合异常信息
当多个并行任务都失败时,可使用CompletionException包装异常,或手动收集异常。
5. 小结
CompletableFuture 让异步编程变得极其直观,几行代码即可实现复杂的异步工作流。通过链式调用、统一异常处理和自定义线程池,既能保持代码可读性,又能在高并发场景下保持系统稳定。希望这份示例能帮助你快速上手并在实际项目中灵活运用。

发表回复