Module  java.base
软件包  java.util.concurrent

Class SubmissionPublisher<T>

  • 参数类型
    T - 已发布的项目类型
    All Implemented Interfaces:
    AutoCloseableFlow.Publisher<T>


    public class SubmissionPublisher<T>
    extends Object
    implements Flow.Publisher<T>, AutoCloseable
    A Flow.Publisher异步地将提交的(非空)项目发送给当前订户,直到关闭。 每个当前用户以相同的顺序接收新提交的项目,除非遇到丢失或异常。 使用提交发布器允许项目生成器作为兼容性reactive-streams发布者依赖于丢弃处理和/或阻止流控制。

    提交发布者使用其构造函数中提供的Executor来传递给订阅者。 执行人员的最佳选择取决于预期用途。 如果提交的项目的生成器以不同的线程运行,并且可以估计订户的数量,请考虑使用Executors.newFixedThreadPool(int) 否则考虑使用默认值,通常是ForkJoinPool.commonPool()

    缓冲允许生产者和消费者以不同的速率暂时运作。 每个用户使用独立的缓冲区。 缓冲区是在首次使用时创建的,并根据需要扩展到给定的最大值。 (强制执行的容量可以四舍五入为最接近的权力和/或由此实施支持的最大值限制。) request调用不直接导致缓冲区扩展,但如果未填充的请求超过最大容量,那么风险将会饱和。 默认值Flow.defaultBufferSize()可能为基于预期利率,资源和用途选择容量提供了有用的起点。

    出版方法支持关于缓冲区饱和时要做什么的不同策略。 方法submit块,直到资源可用。 这是最简单的,但响应不大。 offer方法可能会丢弃项目(立即或有限超时),但提供插入处理程序然后重试的机会。

    如果任何订阅者方法抛出异常,其订阅将被取消。 如果处理程序是作为一个构造函数的参数,它会被取消之前在方法中的异常调用onNext ,但在方法例外onSubscribeonErroronComplete不入账或取消之前办理。 如果提供的Executor在尝试执行任务时抛出RejectedExecutionException (或任何其他RuntimeException或Error),或者在处理丢弃的项目时,丢弃处理程序会引发异常,则会重新引发异常。 在这些情况下,并非所有订阅者都将发布已发布的项目。 在这些情况下通常是closeExceptionally良好做法。

    方法consume(Consumer)简化了对常见情况的支持,其中订户的唯一动作是使用所提供的功能请求和处理所有项目。

    此类也可以作为生成项目的子类的便利基础,并使用此类中的方法发布它们。 例如,这里是定期发布从供应商生成的项目的类。 (实际上,您可以添加独立启动和停止生成的方法,在发布者之间共享Executors等等,或者使用SubmissionPublisher作为组件而不是超类)。

       class PeriodicPublisher<T> extends SubmissionPublisher<T> { final ScheduledFuture<?> periodicTask; final ScheduledExecutorService scheduler; PeriodicPublisher(Executor executor, int maxBufferCapacity, Supplier<? extends T> supplier, long period, TimeUnit unit) { super(executor, maxBufferCapacity); scheduler = new ScheduledThreadPoolExecutor(1); periodicTask = scheduler.scheduleAtFixedRate( () -> submit(supplier.get()), 0, period, unit); } public void close() { periodicTask.cancel(false); scheduler.shutdown(); super.close(); } } 

    这是一个Flow.Processor实现的例子。 为了简化说明,它向其发布者使用单步请求。 更自适应的版本可以使用从submit返回的滞后估计以及其他实用方法来监视流量。

       class TransformProcessor<S,T> extends SubmissionPublisher<T> implements Flow.Processor<S,T> { final Function<? super S, ? extends T> function; Flow.Subscription subscription; TransformProcessor(Executor executor, int maxBufferCapacity, Function<? super S, ? extends T> function) { super(executor, maxBufferCapacity); this.function = function; } public void onSubscribe(Flow.Subscription subscription) { (this.subscription = subscription).request(1); } public void onNext(S item) { subscription.request(1); submit(function.apply(item)); } public void onError(Throwable ex) { closeExceptionally(ex); } public void onComplete() { close(); } } 
    从以下版本开始:
    9
    • 构造方法详细信息

      • SubmissionPublisher

        public SubmissionPublisher​(Executor executor,
                                   int maxBufferCapacity,
                                   BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
        使用给定的Executor创建一个新的SubmissionPublisher,为订户提供异步传递,给定每个订户的最大缓冲区大小,如果非空,则在任何Subscriber在方法 onNext引发异常时调用给定的处理程序。
        参数
        executor - 执行器用于异步传递,支持创建至少一个独立线程
        maxBufferCapacity - 每个用户缓冲区的最大容量(强制容量可以舍入到最接近的两个幂并且/或由该实现支持的最大值限定);方法 getMaxBufferCapacity()返回实际值)
        handler - 如果非空,则在方法 onNext抛出异常时调用的过程
        异常
        NullPointerException - 如果executor为null
        IllegalArgumentException - 如果maxBufferCapacity IllegalArgumentException
      • SubmissionPublisher

        public SubmissionPublisher​(Executor executor,
                                   int maxBufferCapacity)
        使用给定的Executor创建一个新的SubmissionPublisher,以便为订户提供异步传递,给定每个订户的最大缓冲区大小,并且在方法 onNext没有用于订阅者异常的处理程序。
        参数
        executor - 执行器用于异步传递,支持创建至少一个独立线程
        maxBufferCapacity - 每个用户缓冲区的最大容量(强制容量可以舍入到最接近的两个幂并且/或由该实现支持的最大值限定);方法 getMaxBufferCapacity()返回实际值)
        异常
        NullPointerException - 如果executor为null
        IllegalArgumentException - 如果maxBufferCapacity不为正
      • SubmissionPublisher

        public SubmissionPublisher​()
        创建一个新的SubmissionPublisher,使用 ForkJoinPool.commonPool()进行异步传递给用户(除非它不支持至少两个并行性级别,在这种情况下,创建一个新的线程来运行每个任务),最大缓冲容量为 Flow.defaultBufferSize() ,否则方法 onNext订阅者异常的处理程序。
    • 方法详细信息

      • submit

        public int submit​(T item)
        通过异步调用其onNext方法将给定项目发布给每个当前用户,阻止不间断,而任何用户的资源不可用。 此方法返回所有当前订阅者中最大滞后(已提交但尚未消费的项目数)的估计。 如果有任何用户,此值至少为一(占该提交的项目),否则为零。

        如果此发布者的Executor在尝试异步通知订阅者时引发了RejectedExecutionException(或任何其他RuntimeException或Error),则会重新抛出此异常,在这种情况下,并非所有订阅者都将被发出此项。

        参数
        item - 要发布的(非空)项
        结果
        用户估计最大滞后
        异常
        IllegalStateException - 如果关闭
        NullPointerException - 如果item为null
        RejectedExecutionException - 如果由Executor抛出
      • offer

        public int offer​(T item,
                         BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
        如果可能,通过异步调用其onNext方法将给定项目发布给每个当前用户。 如果超出资源限制,该项目可能被一个或多个订户丢弃,在这种情况下调用给定的处理程序(如果非空),如果返回true,则重试一次。 在处理程序被调用时,阻止其他线程在此类中对方法的其他调用。 除非确保恢复,否则选项通常限于记录错误和/或向用户发出onError信号。

        此方法返回一个状态指示器:如果为负,则表示(负)数量的丢弃(尝试将订单发送给订户失败)。 否则,是所有当前订阅者中最大滞后(已提交但尚未消费的项目数)的估计。 如果有任何用户,此值至少为一(占该提交的项目),否则为零。

        如果此发布者的Executor在尝试异步通知订阅者时抛出了RejectedExecutionException(或任何其他RuntimeException或Error),或者在处理丢弃的项目时,drop handler抛出异常,则会重新抛出此异常。

        参数
        item - 要发布的(非空)项
        onDrop - 如果非空,则在删除订户时调用该处理程序,其参数为订阅者和项目; 如果返回true,则重新尝试(一次)
        结果
        如果为负,则(负)滴数; 否则估计最大滞后
        异常
        IllegalStateException - 如果关闭
        NullPointerException - 如果项目为空
        RejectedExecutionException - 如果由Executor抛出
      • offer

        public int offer​(T item,
                         long timeout,
                         TimeUnit unit,
                         BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
        如果可能,通过异步调用其onNext方法将给定项目发布给每个当前订户,阻止任何订阅的资源不可用,直到指定的超时或直到调用者线程中断,此时给定的处理程序(如果非-null)被调用,如果返回true,则重试一次。 (丢弃处理程序可以通过检查当前线程是否中断来区分超时与中断)。在调用处理程序时,阻止其他线程在此类中对方法的其他调用。 除非确定恢复,否则选项通常限于记录错误和/或向用户发出onError信号。

        此方法返回一个状态指示器:如果为负,则表示(负)数量的丢弃(尝试将订单发送给订户失败)。 否则,是所有当前订阅者中最大滞后(已提交但尚未消费的项目数)的估计。 如果有任何用户,此值至少为一(占该提交的项目),否则为零。

        如果此发布者的Executor在尝试异步通知订阅者时抛出了RejectedExecutionException(或任何其他RuntimeException或Error),或者在处理丢弃的项目时,drop handler抛出异常,则会重新抛出此异常。

        参数
        item - 要发布的(非空)项
        timeout -等待多久资源对于任何用户放弃,在单位前 unit
        unit - a TimeUnit确定如何解释 timeout参数
        onDrop - 如果非空,则在删除订户时调用该处理程序,其参数为订阅者和项目; 如果返回true,则重新尝试(一次)
        结果
        如果为负,则(负)滴数; 否则估计最大滞后
        异常
        IllegalStateException - 如果关闭
        NullPointerException - 如果item为null
        RejectedExecutionException - 如果由Executor抛出
      • close

        public void close​()
        除非已经关闭,否则问题onComplete向当前用户发出信号,并且不允许随后的尝试发布。 返回时,这种方法并不能保证所有的用户都尚未完成。
        Specified by:
        close在接口 AutoCloseable
      • closeExceptionally

        public void closeExceptionally​(Throwable error)
        除非已经关闭,否则问题onError以给定的错误向当前订户发出信号,并且不允许随后的尝试发布。 未来订阅者也会收到给定的错误。 返回时,这种方法并不能保证所有的用户都尚未完成。
        参数
        error - 发送给订阅者的 onError参数
        异常
        NullPointerException - 如果错误为null
      • isClosed

        public boolean isClosed​()
        如果此发布商不接受提交,则返回true。
        结果
        如果关闭则为true
      • getClosedException

        public Throwable getClosedException​()
        返回与 closeExceptionally相关联的异常,如果未关闭或正常关闭,则为null。
        结果
        异常,如果没有则为null
      • hasSubscribers

        public boolean hasSubscribers​()
        如果此发布者有任何订阅者,则返回true。
        结果
        如果此发布商有任何订阅者,则为true
      • getNumberOfSubscribers

        public int getNumberOfSubscribers​()
        返回当前订阅者的数量。
        结果
        当前订阅者的数量
      • getExecutor

        public Executor getExecutor​()
        返回用于异步传递的执行程序。
        结果
        Executor用于异步传送
      • getMaxBufferCapacity

        public int getMaxBufferCapacity​()
        返回最大每用户缓冲区容量。
        结果
        最大每用户缓冲区容量
      • getSubscribers

        public List<Flow.Subscriber<? super T>> getSubscribers​()
        返回用于监视和跟踪目的的当前用户列表,而不是在订阅者上调用Flow.Subscriber方法。
        结果
        当前订阅者列表
      • isSubscribed

        public boolean isSubscribed​(Flow.Subscriber<? super T> subscriber)
        如果给定订阅者当前订阅,则返回true。
        参数
        subscriber - 用户
        结果
        如果当前订阅,则为true
        异常
        NullPointerException - 如果用户为空
      • estimateMinimumDemand

        public long estimateMinimumDemand​()
        在所有当前订阅者中,返回所要求的最小数量(通过 request )但尚未生成的估计。
        结果
        估计,如果没有订阅者,则为零
      • estimateMaximumLag

        public int estimateMaximumLag​()
        返回所有当前订阅者中生成但尚未消费的最大项目数量的估计值。
        结果
        估计
      • consume

        public CompletableFuture<Void> consume​(Consumer<? super T> consumer)
        使用给定的Consumer功能处理所有已发布的项目。 返回一个CompletableFuture,当发布商发信号为onComplete ,或者在任何错误时异常地完成,或者消费者抛出异常,或者退回的CompletableFuture被取消,在这种情况下,没有进一步的处理。
        参数
        consumer - 应用于每个onNext项目的函数
        结果
        一个CompletedFuture,当发布商发出信号onComplete时正常完成,特别是在任何错误或取消时
        异常
        NullPointerException - 如果消费者为空