Java中 CompletableFuture 的并行流式编程示例

在 Java 8 及之后的版本中,CompletableFuture 提供了一种强大的方式来处理异步、非阻塞的编程。它可以与 Stream API、ExecutorService 等结合使用,实现高效的并行流式处理。下面给出一个完整的示例,演示如何使用 CompletableFuture 并行读取文件、处理数据,并最终将结果聚合到一个集合中。

需求

  1. 从文件系统中读取多份文本文件(假设文件名为 data1.txt、data2.txt、data3.txt)。
  2. 对每个文件中的每一行进行计算(如统计单词数)。
  3. 所有文件的结果合并为一个 Map<String, Integer>,键为文件名,值为该文件总单词数。

关键点

  • 使用 Files.readAllLines(Path) 读取文件内容。
  • CompletableFuture.supplyAsync 异步读取每个文件。
  • thenApplyAsync 用于处理读取到的数据。
  • allOf 等待所有 CompletableFuture 完成后统一聚合。
  • 自定义线程池控制并发级别。

示例代码

import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class ParallelFileWordCount {

    // 自定义线程池,核心线程数等于文件数
    private static final ExecutorService executor =
        Executors.newFixedThreadPool(3, r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        });

    public static void main(String[] args) throws Exception {
        List <String> files = Arrays.asList("data1.txt", "data2.txt", "data3.txt");

        // 为每个文件创建一个 CompletableFuture
        List<CompletableFuture<Map.Entry<String, Integer>>> futures =
            files.stream()
                 .map(file -> readAndCountWordsAsync(file))
                 .collect(Collectors.toList());

        // 等待所有异步任务完成
        CompletableFuture <Void> allDone =
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        // 当所有任务完成后,收集结果
        CompletableFuture<Map<String, Integer>> finalResult =
            allDone.thenApply(v -> futures.stream()
                                          .map(CompletableFuture::join)
                                          .collect(Collectors.toMap(
                                              Map.Entry::getKey,
                                              Map.Entry::getValue)));

        // 打印最终结果
        finalResult.thenAccept(result -> {
            result.forEach((file, count) ->
                System.out.println(file + " -> 单词数: " + count));
        }).join(); // 阻塞主线程直到结果打印完毕

        // 关闭线程池
        executor.shutdown();
    }

    // 异步读取文件并统计单词数,返回一个 Map.Entry
    private static CompletableFuture<Map.Entry<String, Integer>> readAndCountWordsAsync(String fileName) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Path path = Paths.get(fileName);
                List <String> lines = Files.readAllLines(path);
                // 简单统计单词数:按空格拆分
                int wordCount = lines.stream()
                                     .flatMap(line -> Arrays.stream(line.trim().split("\\s+")))
                                     .filter(s -> !s.isEmpty())
                                     .mapToInt(String::length)
                                     .sum(); // 这里把单词长度累加,作为演示
                return Map.entry(fileName, wordCount);
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }, executor);
    }
}

代码说明

  1. 线程池
    使用 Executors.newFixedThreadPool 创建一个固定大小的线程池,避免在每次调用时创建过多线程。
  2. 异步读取
    supplyAsync 在后台线程读取文件并返回 Map.Entry,其中键为文件名,值为单词数。
  3. 结果聚合
    CompletableFuture.allOf 等待所有任务完成后,使用 thenApply 将各个 CompletableFuture 的结果聚合成一个 Map<String,Integer>
  4. 错误处理
    通过 CompletionException 包装异常,保证错误能够在主线程中被捕获。

扩展

  • 动态文件列表:可以将 files 列表改为从目录读取所有 .txt 文件。
  • 更复杂的数据处理:在 thenApplyAsync 里加入更高阶的业务逻辑,例如过滤、排序、分组等。
  • 结果持久化:将聚合后的 Map 写入数据库或文件。

通过上述方式,你可以轻松地将传统串行的文件处理转为并行异步,充分利用多核 CPU,显著提升性能。


祝你编码愉快,Happy Coding!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注