Java 21 新功能:Structured Concurrency 与协程调度器实现

在 Java 21 中,Structured Concurrency 的正式发布为 Java 开发者提供了一套更直观、易于管理的并发模型。它把多线程编程的“并发”与“同步”逻辑统一起来,使得代码既保持了传统线程的灵活性,又兼顾了协程式编程的易读性。下面以一个简单的协程调度器为例,演示如何利用 Structured Concurrency 编写可维护、无死锁风险的并发代码。


1. 关键概念回顾

概念 说明
Virtual Thread 轻量级线程,支持大规模并发,创建与销毁开销极低。
StructuredTaskScope 一个任务容器,所有子任务在同一生命周期内完成,失败会统一传播。
Task 代表一个可取消、可等待的执行单元。
Cancellation 当任一子任务失败或显式取消时,容器内所有任务都会被取消。

2. 设计目标

  • 并行处理:对多组数据进行独立的网络请求或计算任务。
  • 错误聚合:任何一个子任务失败都能立即取消其他任务,并把异常统一抛出。
  • 易读写:代码结构类似同步流程,减少回调地狱。

3. 代码示例

import java.net.URI;
import java.net.http.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.CompletionException;

public class CoroutineScheduler {

    private final HttpClient client;
    private final ExecutorService executor;

    public CoroutineScheduler() {
        // 使用虚拟线程执行器
        this.executor = Executors.newVirtualThreadPerTaskExecutor();
        this.client = HttpClient.newBuilder()
                .executor(executor)
                .build();
    }

    /**
     * 以协程方式并行请求多个 URL
     *
     * @param urls 需要请求的 URL 列表
     * @return 每个 URL 对应的响应体
     * @throws Exception 所有子任务异常会统一抛出
     */
    public List <String> fetchAll(List<URI> urls) throws Exception {
        // 使用 StructuredTaskScope 来管理所有子任务
        try (var scope = StructuredTaskScope.newInstance()) {

            // 为每个 URL 创建一个子任务
            for (URI uri : urls) {
                scope.fork(() -> {
                    HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
                    HttpResponse <String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
                    // 简化:直接返回 body
                    return response.body();
                });
            }

            // 等待所有子任务完成
            // 若任一子任务抛异常,scope 立即取消其余任务
            scope.join();

            // 统一收集结果
            List <String> results = new ArrayList<>(urls.size());
            for (var t : scope.subtasks()) {
                results.add(t.result());
            }
            return results;
        }
    }

    public static void main(String[] args) {
        var scheduler = new CoroutineScheduler();

        List <URI> urls = List.of(
                URI.create("https://httpbin.org/get"),
                URI.create("https://httpbin.org/status/404"),
                URI.create("https://httpbin.org/delay/2")
        );

        try {
            List <String> bodies = scheduler.fetchAll(urls);
            bodies.forEach(body -> System.out.println("Response size: " + body.length()));
        } catch (Exception e) {
            System.err.println("请求失败: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

代码要点解析

  1. 虚拟线程执行器

    this.executor = Executors.newVirtualThreadPerTaskExecutor();

    所有 HTTP 请求都在虚拟线程中执行,资源占用极低,适合高并发场景。

  2. StructuredTaskScope

    try (var scope = StructuredTaskScope.newInstance()) { … }

    通过 fork() 方法启动子任务,join() 等待全部完成。若子任务抛出异常,容器会自动取消其他任务,并把异常聚合为 CompletionException

  3. 子任务返回值
    scope.fork(() -> { … }) 里返回的值可以通过 t.result() 取得,保证了结果的顺序与 URL 顺序对应。

  4. 异常处理
    当某个 URL 返回 404 时,client.send() 会抛出 java.net.http.HttpTimeoutExceptionCompletionException,整个 fetchAll() 方法会直接抛出。这样调用方可以统一捕获并做出错误处理。


4. 与传统 ThreadPool 的比较

维度 传统 ThreadPool + Future Structured Concurrency
代码结构 需要显式管理 Future、取消、异常聚合 通过 StructuredTaskScope 自动完成
资源管理 手动 shutdown 线程池 容器自动关闭资源
并发粒度 线程池线程数受限 虚拟线程可按需创建
错误传播 手动检查 Future.get() 任何子任务异常立即取消其余任务

5. 小结

Java 21 的 Structured Concurrency 让并发代码更像顺序代码,显著降低了线程管理的复杂度。通过虚拟线程配合 StructuredTaskScope,我们可以轻松实现一个简洁、高效且安全的协程调度器。无论是网络 I/O、文件读写还是 CPU 密集型计算,使用这一新特性都能获得更好的可维护性和性能。


评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注