Skip to content

Flowable#groupBy race leads to a back-pressure issue #7100

@bsideup

Description

@bsideup

Hi!

While debugging reactor/reactor-core#2352 we wanted to check whether RxJava has the same issue since, given the history of both projects :)

Apparently, with 3.0.7, the same construction in RxJava fails with a very similar issue (although the failure is different):

final int total = 100; Long count = Flowable.range(0, total) .groupBy(i -> (i / 2) * 2) .flatMapMaybe(Flowable::firstElement, false, 1) .observeOn(Schedulers.io()) .count() .blockingGet(); assertThat(total - count).as("count").isZero();

Gives (not 100% reliably, consider running in "rerun until failure" mode):

io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#97) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.	at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197) 

A few interesting observations:

  1. Changing observeOn's buffer size to 131 and higher makes it always pass
  2. 130 would sometimes fail with Unable to emit a new group (#99) due to lack of requests
  3. 129 would sometimes fail with Unable to emit a new group (#98) due to lack of requests
  4. 128 would sometimes fail with Unable to emit a new group (#97) due to lack of requests
  5. etc etc

So it looks like there is a race between cancellation of the group and starting a new one, although I haven't investigated RxJava's issue much.

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions