- java.lang.Object
-
- java.util.concurrent.Flow
-
public final class Flow extends Object
用于建立流量控制组件的相互关联的接口和静态方法,其中Publishers
生成由一个或多个Subscribers
消耗的项目,每个由Subscription
管理。这些接口对应于reactive-streams规范。 它们适用于并发和分布式异步设置:全部(七)方法在
void
“单向”消息样式中定义。 通信依赖于一种简单的流控制形式(方法Flow.Subscription.request(long)
),可用于避免在“推”式系统中可能出现的资源管理问题。例子。 A
Flow.Publisher
通常定义自己的Flow.Subscription
实现; 在方法subscribe
构造一个并将其发送到呼叫Flow.Subscriber
。 它通过异步方式向用户发布项目,通常使用Executor
。 例如,这是一个非常简单的发行商,只向一个订户发出一个单独的TRUE
项目(当请求时)。 因为订户只接收一个项目,所以这个类不使用大多数实现中所需的缓冲和排序控制(例如SubmissionPublisher
)。class OneShotPublisher implements Publisher<Boolean> { private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based private boolean subscribed; // true after first subscribe public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { if (subscribed) subscriber.onError(new IllegalStateException()); // only one allowed else { subscribed = true; subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); } } static class OneShotSubscription implements Subscription { private final Subscriber<? super Boolean> subscriber; private final ExecutorService executor; private Future<?> future; // to allow cancellation private boolean completed; OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) { this.subscriber = subscriber; this.executor = executor; } public synchronized void request(long n) { if (n != 0 && !completed) { completed = true; if (n < 0) { IllegalArgumentException ex = new IllegalArgumentException(); executor.execute(() -> subscriber.onError(ex)); } else { future = executor.submit(() -> { subscriber.onNext(Boolean.TRUE); subscriber.onComplete(); }); } } } public synchronized void cancel() { completed = true; if (future != null) future.cancel(false); } } }
A
Flow.Subscriber
安排要求和处理的项目。 项目(调用Flow.Subscriber.onNext(T)
)除非另有要求,否则不发出,但可能需要多个项目。 许多订阅者实现可以按照以下示例的风格来排列,其中缓冲区大小为1个单步和较大的大小通常允许更少的通信进行更有效的重叠处理; 例如值为64,这将保持32到64之间的未完成请求。由于给定的Flow.Subscription
的订阅方法调用是严格排序的,除非订阅者维护多个订阅,否则这些方法不需要使用锁或挥发性最好是定义多个订阅者,每个订阅者都有自己的订阅)。class SampleSubscriber<T> implements Subscriber<T> { final Consumer<? super T> consumer; Subscription subscription; final long bufferSize; long count; SampleSubscriber(long bufferSize, Consumer<? super T> consumer) { this.bufferSize = bufferSize; this.consumer = consumer; } public void onSubscribe(Subscription subscription) { long initialRequestSize = bufferSize; count = bufferSize - bufferSize / 2; // re-request when half consumed (this.subscription = subscription).request(initialRequestSize); } public void onNext(T item) { if (--count <= 0) subscription.request(count = bufferSize - bufferSize / 2); consumer.accept(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} }
默认值
defaultBufferSize()
可能为基于预期利率,资源和用途的流量组件中选择请求大小和容量提供了有用的起点。 或者,当不需要流量控制时,用户可以最初请求有效无限数量的项目,如:class UnboundedSubscriber<T> implements Subscriber<T> { public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); // effectively unbounded } public void onNext(T item) { use(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} void use(T item) { ... } }
- 从以下版本开始:
- 9
-
-
Nested Class Summary
Nested Classes Modifier and Type Class 描述 static interface
Flow.Processor<T,R>
作为订阅者和发布者的组件。static interface
Flow.Publisher<T>
订阅者接收的项目(和相关控制消息)的生产者。static interface
Flow.Subscriber<T>
消息的接收者。static interface
Flow.Subscription
消息控件链接Flow.Publisher
和Flow.Subscriber
。
-