2

Say, I want to make an Observable in RxJava, which has a feedback coupling like on the image below.

Self-dependent stream

I've managed to achieve that with the use of subjects, like this:

// Observable<Integer> source = Observable.range(0, 6); public Observable<Integer> getFeedbackSum(Observable<Integer> source) { UnicastSubject<Integer> feedback = UnicastSubject.create(); Observable<Integer> feedbackSum = Observable.zip(source, feedback.startWith(0), Pair::create) .map(pair -> pair.first + pair.second); feedbackSum.subscribe(feedback); return feedbackSum; } 

It looks rather ugly. Is there a better way?

2
  • 3
    There is an operator for it: scan. Commented Jul 27, 2017 at 20:00
  • Yeah, totally overlooked it. Thanks, @akarnokd! Commented Jul 27, 2017 at 20:17

1 Answer 1

2

There is an operator for this: scan:

public final <R> Observable<R> scan(R initialValue, BiFunction<R,? super T,R> accumulator)

Observable.range(0, 6) .scan(0, (a, b) -> a + b) .test() .assertResut(0, 1, 3, 6, 10, 15, 21); 

In case your accumulator type is not immutable, you can use scanWith:

public final <R> Observable<R> scanWith(Callable<R> seedSupplier, BiFunction<R,? super T,R> accumulator)

Sign up to request clarification or add additional context in comments.

1 Comment

scanWith example: static int initial = 1; .... Observable.range(1,2).scanWith(()->initial++, (a,b)->a+b).repeat(2).subscribe(out::println); . Result: 1,2,4,2,3,5

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.