6,961 questions
0 votes
1 answer
47 views
What is the equivalent to doOnComplete for mono
At some point in our code using rxjava we have public Maybe<Event> verifyFoo(Event event){ return Maybe.just(event) .flatMap(this::foo) .doOnComplete(() -> ...
0 votes
1 answer
88 views
RxJava Maybe.doFinally() executing much later than doOnSuccess() in high-throughput multithreaded environment
I'm experiencing a confusing timing issue with RxJava's Maybe operators in a high-throughput service (40 TPS) with concurrent execution. I have two classes that log execution times, but they're ...
1 vote
0 answers
103 views
Not using side effects, immediately emit State based on condition, then debounce the user input and make a server request for new State
Given: an Observable that emits the user input; 3 states: Success, Error, Idle (shows nothing, may be treated as some progress). I want the following behavior: as soon as the user types, check if ...
1 vote
1 answer
31 views
In RxJava, is subscribeOn(mainThread).subscribe() guaranteed to run synchronously in mainThread?
In RxJava, is subscribeOn(mainThread).subscribe() guaranteed to run synchronously in mainThread? Or is it possible that another piece of work already scheduled to run on the main thread may run first? ...
0 votes
0 answers
43 views
RxJava BehaviorSubject.onNext() is called but subscriber is not receiving updates
I am trying to implement background polling in an RxJava-based Android application where: A repository periodically fetches data from an API every 10 seconds and emits updates via a BehaviorSubject. ...
0 votes
0 answers
23 views
Vert.x - streaming HTTP request body with rxJava blocks
I am attempting to stream a large HTTP request between my Vert.x services. For this purpose, I am using io.vertx.reactivex.ext.web.client.WebClient, which takes a Flowable as input for the ...
0 votes
0 answers
23 views
Combine multiple observables based on id
Let's say I have two observables Observable<A> with values like (1, Ametadata), (2, Ametadata) 1, 2 ... are ids and Ametadata is additional attributes of A Observable<B> with values like (...
0 votes
1 answer
73 views
RxJava3 groupBy emitting individual groups instead of grouping by key
Working on updating my app from RxJava2 to RxJava3. I know there are some updates to how groupBy works from reading the documentation, but I'm having trouble understanding why I'm seeing this ...
0 votes
1 answer
45 views
Merge Flowable with filtered elements updated by batch anyc operation
I have rxjava3 Flowable having Item objects and want to run async operation in batches only for elements matching a condition. Then I want to allow downstream operators to iterate over original Item ...
0 votes
1 answer
35 views
rxSend vs send in Rxified API for Http Client in Vert.x
I am trying to understand a difference between below two snippets using rxRequest/rxSend vs request/send. They seem to behave in the same way. RecordParser parser = RecordParser.newDelimited("\n&...
1 vote
1 answer
42 views
How to handle multiple subscriptions in a fair manner?
I need to handle different types of events in a strict one by one manner, but in a background thread. According to the documentation, the next code, Schedulers.from(executor, false, true);, should ...
1 vote
2 answers
70 views
Reactive streams (monix) operator combination for buffering for a timespan with overlapping elements
I have an Observable[T] which emits continuously and I want an Observable[List[T]] which emits the last elements in a specified duration for each element the source emits. Example Observable. range(0, ...
0 votes
2 answers
95 views
Unbounded backpressure in Flowable rxjava3
Reactive Streams is built around back pressure what is great and I would like to better understand how to use RxJava3 API. The approach to drop overflow data for GUI events is perfectly fine but if I ...
0 votes
2 answers
89 views
RxJava Iterate over list, call API for each item, and return a list as result?
I have to call an API which returns an object, and inside this object, there is a list. For each item in this list, I have to retrieve item details. To retrieve details, I have to make async API calls ...
1 vote
0 answers
242 views
io.reactivex.exceptions.UndeliverableException:The exception could not be delivered to the consumer because it has already canceled/disposed the flow
I am using RxJava in my app. Upon making changes to the version code > clearing cache > clearing data > uninstalling > reinstalling the app. The app goes straight to the HomeFragment after ...