- Notifications
You must be signed in to change notification settings - Fork 7.6k
Open
Labels
Milestone
Description
Hi!
While debugging reactor/reactor-core#2352 we wanted to check whether RxJava has the same issue since, given the history of both projects :)
Apparently, with 3.0.7, the same construction in RxJava fails with a very similar issue (although the failure is different):
final int total = 100; Long count = Flowable.range(0, total) .groupBy(i -> (i / 2) * 2) .flatMapMaybe(Flowable::firstElement, false, 1) .observeOn(Schedulers.io()) .count() .blockingGet(); assertThat(total - count).as("count").isZero();Gives (not 100% reliably, consider running in "rerun until failure" mode):
io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#97) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed. at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197) A few interesting observations:
- Changing
observeOn's buffer size to131and higher makes it always pass 130would sometimes fail withUnable to emit a new group (#99) due to lack of requests129would sometimes fail withUnable to emit a new group (#98) due to lack of requests128would sometimes fail withUnable to emit a new group (#97) due to lack of requests- etc etc
So it looks like there is a race between cancellation of the group and starting a new one, although I haven't investigated RxJava's issue much.
Reactions are currently unavailable