在 Java 8 及之后的版本中,CompletableFuture 提供了一种强大的方式来处理异步、非阻塞的编程。它可以与 Stream API、ExecutorService 等结合使用,实现高效的并行流式处理。下面给出一个完整的示例,演示如何使用 CompletableFuture 并行读取文件、处理数据,并最终将结果聚合到一个集合中。
需求
- 从文件系统中读取多份文本文件(假设文件名为 data1.txt、data2.txt、data3.txt)。
- 对每个文件中的每一行进行计算(如统计单词数)。
- 所有文件的结果合并为一个
Map<String, Integer>,键为文件名,值为该文件总单词数。
关键点
- 使用
Files.readAllLines(Path)读取文件内容。 - 用
CompletableFuture.supplyAsync异步读取每个文件。 thenApplyAsync用于处理读取到的数据。allOf等待所有 CompletableFuture 完成后统一聚合。- 自定义线程池控制并发级别。
示例代码
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ParallelFileWordCount {
// 自定义线程池,核心线程数等于文件数
private static final ExecutorService executor =
Executors.newFixedThreadPool(3, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
public static void main(String[] args) throws Exception {
List <String> files = Arrays.asList("data1.txt", "data2.txt", "data3.txt");
// 为每个文件创建一个 CompletableFuture
List<CompletableFuture<Map.Entry<String, Integer>>> futures =
files.stream()
.map(file -> readAndCountWordsAsync(file))
.collect(Collectors.toList());
// 等待所有异步任务完成
CompletableFuture <Void> allDone =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
// 当所有任务完成后,收集结果
CompletableFuture<Map<String, Integer>> finalResult =
allDone.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue)));
// 打印最终结果
finalResult.thenAccept(result -> {
result.forEach((file, count) ->
System.out.println(file + " -> 单词数: " + count));
}).join(); // 阻塞主线程直到结果打印完毕
// 关闭线程池
executor.shutdown();
}
// 异步读取文件并统计单词数,返回一个 Map.Entry
private static CompletableFuture<Map.Entry<String, Integer>> readAndCountWordsAsync(String fileName) {
return CompletableFuture.supplyAsync(() -> {
try {
Path path = Paths.get(fileName);
List <String> lines = Files.readAllLines(path);
// 简单统计单词数:按空格拆分
int wordCount = lines.stream()
.flatMap(line -> Arrays.stream(line.trim().split("\\s+")))
.filter(s -> !s.isEmpty())
.mapToInt(String::length)
.sum(); // 这里把单词长度累加,作为演示
return Map.entry(fileName, wordCount);
} catch (Exception e) {
throw new CompletionException(e);
}
}, executor);
}
}
代码说明
- 线程池
使用Executors.newFixedThreadPool创建一个固定大小的线程池,避免在每次调用时创建过多线程。 - 异步读取
supplyAsync在后台线程读取文件并返回Map.Entry,其中键为文件名,值为单词数。 - 结果聚合
CompletableFuture.allOf等待所有任务完成后,使用thenApply将各个CompletableFuture的结果聚合成一个Map<String,Integer>。 - 错误处理
通过CompletionException包装异常,保证错误能够在主线程中被捕获。
扩展
- 动态文件列表:可以将
files列表改为从目录读取所有.txt文件。 - 更复杂的数据处理:在
thenApplyAsync里加入更高阶的业务逻辑,例如过滤、排序、分组等。 - 结果持久化:将聚合后的
Map写入数据库或文件。
通过上述方式,你可以轻松地将传统串行的文件处理转为并行异步,充分利用多核 CPU,显著提升性能。
祝你编码愉快,Happy Coding!

发表回复