Flow API
Java Flow API 是 Java 9 引入的原生响应式编程框架,基于 Reactive Streams 规范实现,主要用于处理异步数据流并支持背压机制,适合构建高并发、非阻塞的应用场景。
一、核心组件
Java Flow API 包含四个关键接口:
Publisher(发布者)
负责发布数据项给一个或多个订阅者。通过调用订阅者的 onNext 方法来传递数据项。作为数据生产者,通过 subscribe() 方法绑定订阅者,并发送数据流。
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Subscriber(订阅者)
接收来自发布者的数据项。定义数据处理生命周期方法(如 onSubscribe, onNext, onError, 和 onComplete)接收数据并处理事件。
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
Subscription(订阅关系)
连接发布者和订阅者之间的桥梁。通过这个接口,订阅者可以请求数据项或者取消订阅。管理背压机制,通过 request() 方法控制数据请求速率,避免订阅者过载。
Processor(处理器)
同时实现了 Publisher 和 Subscriber 接口,可以在数据流中作为中间节点,用于转换或者过滤数据。兼具 Publisher 和 Subscriber 的双重角色,用于数据转换或中间处理。
二、使用示例
1、简单的发布-订阅模型,请求无限数据:
import java.util.concurrent.Flow.*;
public class FlowAPITest {
public static void main(String[] args) {
// 创建同步发布者
Publisher<Integer> publisher = subscriber -> {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i); // 发送数据
}
subscriber.onComplete(); // 标记完成
};
// 创建订阅者
Subscriber<Integer> subscriber = new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE); // 请求无限数据(简化示例)
}
@Override
public void onNext(Integer item) {
System.out.println("Received: " + item);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Processing completed");
}
};
publisher.subscribe(subscriber); // 绑定关系
}
}
执行结果
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Processing completed
2、处理完一个,请求一个数据:
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
public class FlowApiExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 创建一个订阅者
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
// 请求一个数据项
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("接收到: " + item);
// 处理完当前数据项后再次请求下一个
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("完成");
}
};
// 注册订阅者
publisher.subscribe(subscriber);
// 发布一些数据项
publisher.submit("Hello");
publisher.submit("World");
// 关闭发布者
publisher.close();
// 等待一段时间确保所有消息都被处理
Thread.sleep(1000);
}
}
执行结果
接收到: Hello
接收到: World
完成
三、与其他响应式框架对比
标准化支持
Java Flow 是官方标准 API,而 Reactor 或 RxJava 是第三方库,提供更丰富的操作符和扩展功能。
应用场景
Flow API 适合轻量级异步流处理,如 HTTP 客户端响应解析。
复杂数据流处理(如过滤、转换链)更适合使用 Reactor 或 RxJava。
四、优势与限制
优势:简化异步编程模型,背压控制机制避免资源耗尽。
限制:原生 API 功能较基础,需结合其他库实现复杂逻辑。