在 Java 21 中,Structured Concurrency 的正式发布为 Java 开发者提供了一套更直观、易于管理的并发模型。它把多线程编程的“并发”与“同步”逻辑统一起来,使得代码既保持了传统线程的灵活性,又兼顾了协程式编程的易读性。下面以一个简单的协程调度器为例,演示如何利用 Structured Concurrency 编写可维护、无死锁风险的并发代码。
1. 关键概念回顾
| 概念 | 说明 |
|---|---|
| Virtual Thread | 轻量级线程,支持大规模并发,创建与销毁开销极低。 |
| StructuredTaskScope | 一个任务容器,所有子任务在同一生命周期内完成,失败会统一传播。 |
| Task | 代表一个可取消、可等待的执行单元。 |
| Cancellation | 当任一子任务失败或显式取消时,容器内所有任务都会被取消。 |
2. 设计目标
- 并行处理:对多组数据进行独立的网络请求或计算任务。
- 错误聚合:任何一个子任务失败都能立即取消其他任务,并把异常统一抛出。
- 易读写:代码结构类似同步流程,减少回调地狱。
3. 代码示例
import java.net.URI;
import java.net.http.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.CompletionException;
public class CoroutineScheduler {
private final HttpClient client;
private final ExecutorService executor;
public CoroutineScheduler() {
// 使用虚拟线程执行器
this.executor = Executors.newVirtualThreadPerTaskExecutor();
this.client = HttpClient.newBuilder()
.executor(executor)
.build();
}
/**
* 以协程方式并行请求多个 URL
*
* @param urls 需要请求的 URL 列表
* @return 每个 URL 对应的响应体
* @throws Exception 所有子任务异常会统一抛出
*/
public List <String> fetchAll(List<URI> urls) throws Exception {
// 使用 StructuredTaskScope 来管理所有子任务
try (var scope = StructuredTaskScope.newInstance()) {
// 为每个 URL 创建一个子任务
for (URI uri : urls) {
scope.fork(() -> {
HttpRequest request = HttpRequest.newBuilder(uri).GET().build();
HttpResponse <String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
// 简化:直接返回 body
return response.body();
});
}
// 等待所有子任务完成
// 若任一子任务抛异常,scope 立即取消其余任务
scope.join();
// 统一收集结果
List <String> results = new ArrayList<>(urls.size());
for (var t : scope.subtasks()) {
results.add(t.result());
}
return results;
}
}
public static void main(String[] args) {
var scheduler = new CoroutineScheduler();
List <URI> urls = List.of(
URI.create("https://httpbin.org/get"),
URI.create("https://httpbin.org/status/404"),
URI.create("https://httpbin.org/delay/2")
);
try {
List <String> bodies = scheduler.fetchAll(urls);
bodies.forEach(body -> System.out.println("Response size: " + body.length()));
} catch (Exception e) {
System.err.println("请求失败: " + e.getMessage());
e.printStackTrace();
}
}
}
代码要点解析
-
虚拟线程执行器
this.executor = Executors.newVirtualThreadPerTaskExecutor();所有 HTTP 请求都在虚拟线程中执行,资源占用极低,适合高并发场景。
-
StructuredTaskScope
try (var scope = StructuredTaskScope.newInstance()) { … }通过
fork()方法启动子任务,join()等待全部完成。若子任务抛出异常,容器会自动取消其他任务,并把异常聚合为CompletionException。 -
子任务返回值
scope.fork(() -> { … })里返回的值可以通过t.result()取得,保证了结果的顺序与 URL 顺序对应。 -
异常处理
当某个 URL 返回 404 时,client.send()会抛出java.net.http.HttpTimeoutException或CompletionException,整个fetchAll()方法会直接抛出。这样调用方可以统一捕获并做出错误处理。
4. 与传统 ThreadPool 的比较
| 维度 | 传统 ThreadPool + Future | Structured Concurrency |
|---|---|---|
| 代码结构 | 需要显式管理 Future、取消、异常聚合 | 通过 StructuredTaskScope 自动完成 |
| 资源管理 | 手动 shutdown 线程池 | 容器自动关闭资源 |
| 并发粒度 | 线程池线程数受限 | 虚拟线程可按需创建 |
| 错误传播 | 手动检查 Future.get() | 任何子任务异常立即取消其余任务 |
5. 小结
Java 21 的 Structured Concurrency 让并发代码更像顺序代码,显著降低了线程管理的复杂度。通过虚拟线程配合 StructuredTaskScope,我们可以轻松实现一个简洁、高效且安全的协程调度器。无论是网络 I/O、文件读写还是 CPU 密集型计算,使用这一新特性都能获得更好的可维护性和性能。

发表回复