Module  java.base
软件包  java.util.concurrent

Class Flow



  • public final class Flow
    extends Object
    用于建立流量控制组件的相互关联的接口和静态方法,其中Publishers生成由一个或多个Subscribers消耗的项目,每个由Subscription管理。

    这些接口对应于reactive-streams规范。 它们适用于并发和分布式异步设置:全部(七)方法在void “单向”消息样式中定义。 通信依赖于一种简单的流控制形式(方法Flow.Subscription.request(long) ),可用于避免在“推”式系统中可能出现的资源管理问题。

    例子。 A Flow.Publisher通常定义自己的Flow.Subscription实现; 在方法subscribe构造一个并将其发送到呼叫Flow.Subscriber 它通过异步方式向用户发布项目,通常使用Executor 例如,这是一个非常简单的发行商,只向一个订户发出一个单独的TRUE项目(当请求时)。 因为订户只接收一个项目,所以这个类不使用大多数实现中所需的缓冲和排序控制(例如SubmissionPublisher )。

       class OneShotPublisher implements Publisher<Boolean> { private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based private boolean subscribed; // true after first subscribe public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { if (subscribed) subscriber.onError(new IllegalStateException()); // only one allowed else { subscribed = true; subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); } } static class OneShotSubscription implements Subscription { private final Subscriber<? super Boolean> subscriber; private final ExecutorService executor; private Future<?> future; // to allow cancellation private boolean completed; OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) { this.subscriber = subscriber; this.executor = executor; } public synchronized void request(long n) { if (n != 0 && !completed) { completed = true; if (n < 0) { IllegalArgumentException ex = new IllegalArgumentException(); executor.execute(() -> subscriber.onError(ex)); } else { future = executor.submit(() -> { subscriber.onNext(Boolean.TRUE); subscriber.onComplete(); }); } } } public synchronized void cancel() { completed = true; if (future != null) future.cancel(false); } } } 

    A Flow.Subscriber安排要求和处理的项目。 项目(调用Flow.Subscriber.onNext(T) )除非另有要求,否则不发出,但可能需要多个项目。 许多订阅者实现可以按照以下示例的风格来排列,其中缓冲区大小为1个单步和较大的大小通常允许更少的通信进行更有效的重叠处理; 例如值为64,这将保持32到64之间的未完成请求。由于给定的Flow.Subscription订阅方法调用是严格排序的,除非订阅者维护多个订阅,否则这些方法不需要使用锁或挥发性最好是定义多个订阅者,每个订阅者都有自己的订阅)。

       class SampleSubscriber<T> implements Subscriber<T> { final Consumer<? super T> consumer; Subscription subscription; final long bufferSize; long count; SampleSubscriber(long bufferSize, Consumer<? super T> consumer) { this.bufferSize = bufferSize; this.consumer = consumer; } public void onSubscribe(Subscription subscription) { long initialRequestSize = bufferSize; count = bufferSize - bufferSize / 2; // re-request when half consumed (this.subscription = subscription).request(initialRequestSize); } public void onNext(T item) { if (--count <= 0) subscription.request(count = bufferSize - bufferSize / 2); consumer.accept(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} } 

    默认值defaultBufferSize()可能为基于预期利率,资源和用途的流量组件中选择请求大小和容量提供了有用的起点。 或者,当不需要流量控制时,用户可以最初请求有效无限数量的项目,如:

       class UnboundedSubscriber<T> implements Subscriber<T> { public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); // effectively unbounded } public void onNext(T item) { use(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} void use(T item) { ... } } 
    从以下版本开始:
    9
    • 方法详细信息

      • defaultBufferSize

        public static int defaultBufferSize​()
        返回发布者或订阅者缓冲的默认值,可以在没有其他约束的情况下使用。
        Implementation Note:
        返回的当前值为256。
        结果
        缓冲区大小值