莫方教程网

专业程序员编程教程与实战案例分享

Java响应式编程:Flow API

Flow API

Java Flow API 是 Java 9 引入的原生响应式编程框架,基于 Reactive Streams 规范实现,主要用于处理异步数据流并支持背压机制,适合构建高并发、非阻塞的应用场景。

一、核心组件

Java Flow API 包含四个关键接口:

Publisher(发布者)

负责发布数据项给一个或多个订阅者。通过调用订阅者的 onNext 方法来传递数据项。作为数据生产者,通过 subscribe() 方法绑定订阅者,并发送数据流。

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

Subscriber(订阅者)

接收来自发布者的数据项。定义数据处理生命周期方法(如 onSubscribe, onNext, onError, 和 onComplete)接收数据并处理事件。

public interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

Subscription(订阅关系)

连接发布者和订阅者之间的桥梁。通过这个接口,订阅者可以请求数据项或者取消订阅。管理背压机制,通过 request() 方法控制数据请求速率,避免订阅者过载。

Processor(处理器)

同时实现了 Publisher 和 Subscriber 接口,可以在数据流中作为中间节点,用于转换或者过滤数据。兼具 Publisher 和 Subscriber 的双重角色,用于数据转换或中间处理。

二、使用示例

1、简单的发布-订阅模型,请求无限数据:

import java.util.concurrent.Flow.*;

public class FlowAPITest {
    public static void main(String[] args) {
        // 创建同步发布者
        Publisher<Integer> publisher = subscriber -> {
            for (int i = 0; i < 5; i++) {
                subscriber.onNext(i);  // 发送数据
            }
            subscriber.onComplete();   // 标记完成
        };

        // 创建订阅者
        Subscriber<Integer> subscriber = new Subscriber<>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);  // 请求无限数据(简化示例)
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("Received: " + item);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("Processing completed");
            }
        };

        publisher.subscribe(subscriber);  // 绑定关系
    }
}

执行结果

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Processing completed

2、处理完一个,请求一个数据:

import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;

public class FlowApiExample {

    public static void main(String[] args) throws InterruptedException {
        // 创建一个发布者
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        // 创建一个订阅者
        Subscriber<String> subscriber = new Subscriber<>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                // 请求一个数据项
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("接收到: " + item);
                // 处理完当前数据项后再次请求下一个
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        };

        // 注册订阅者
        publisher.subscribe(subscriber);

        // 发布一些数据项
        publisher.submit("Hello");
        publisher.submit("World");

        // 关闭发布者
        publisher.close();

        // 等待一段时间确保所有消息都被处理
        Thread.sleep(1000);
    }
}

执行结果

接收到: Hello
接收到: World
完成

三、与其他响应式框架对比

标准化支持

Java Flow 是官方标准 API,而 Reactor 或 RxJava 是第三方库,提供更丰富的操作符和扩展功能。

应用场景

Flow API 适合轻量级异步流处理,如 HTTP 客户端响应解析。

复杂数据流处理(如过滤、转换链)更适合使用 Reactor 或 RxJava。

四、优势与限制

优势:简化异步编程模型,背压控制机制避免资源耗尽。

限制:原生 API 功能较基础,需结合其他库实现复杂逻辑。

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言

    滇ICP备2024046894号-1