Skip to content

ReactiveStreams : QueueBasedSubscriber

johnmcclean-aol edited this page Nov 23, 2016 · 1 revision

QueueBasedSubscriber

Is a more advanced / complicated ReactiveStreams subscriber (depending on your point of view). It allows multiple subscriptions to be backed by a cyclops-react async.Queue (or topic) with demand signalled concurrently (configurable) across those subscriptions. By default errors are not propagated, but users can plugin their own error handler. Connected streams will remain connected until the Queue is closed.

For most use cases, please use SeqSubscriber instead.

For this simple use case the connected Stream will print out 1,2,3 but remain connected to the QueueBasedSubscriber (as we have to close the Queue to close the connection).

QueueBasedSubscriber<Integer> sub = QueueBasedSubscriber.subscriber(QueueFactories.boundedNonBlockingQueue(1000), new Counter(), 10); ReactiveSeq.of(1,2,3) .subscribe(sub); sub.reactiveSeq() .forEachWithError(System.out::println, System.err::println);

The input parameters to QueueBasedSubsriber are

  • Queue to back the subscriptions with
  • Counter which should be set to level of connected subscriptions
  • Max concurrency across subscriptions

Multiple subscriptions with a single data Queue

Allowing multiple subscriptions for a single subscriber would break the reactive-streams TCK, so with QueueBasedSubscriber we create multiple subscribers backed by the same async.Queue.

final Counter c = new Counter(); c.active.set(publishers.size() + 1); //set the number of publishers we will connnect to final QueueBasedSubscriber<T> mainSub = QueueBasedSubscriber.subscriber(factory, c, publishers.size()); //define an initializing Supplier final Supplier<Continuation> sp = () -> { subscribe(mainSub); // when the supplier is run subscribe for (final Publisher<T> next : publishers) { // for each Publisher generate a new Subscriber backed by the same Queue next.subscribe(QueueBasedSubscriber.subscriber(mainSub.getQueue(), c, publishers.size())); } init.close(); //finish the initialization of the QueueBasedSubscriber return Continuation.empty(); }; final Continuation continuation = new Continuation( sp); init.addContinuation(continuation); //configure the Queue to initialize return ReactiveSeq.fromStream(init.jdkStream());

Clone this wiki locally