15

I want to take 3 last elements from an observable. Let's say that my timeline looks like this:

--a---b-c---d---e---f-g-h-i------j->

where: a, b, c, d, e, f, g, h, i, j are emitted values

Whenever a new value is emitted I want to get it immediately, so it can look like this:

[a] [a, b] [a, b, c] [b, c, d] [c, d, e] [d, e, f] [e, f, g] [f, g, h] ... and so on 

I think that this is super useful. Imagine building a chat where you want to display 10 last messages. Whenever a new message comes you want to update your view.

My attempt: demo

2 Answers 2

30

You can use scan for this:

from(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u']) .pipe( scan((acc, val) => { acc.push(val); return acc.slice(-3); }, []), ) .subscribe(console.log); 

This will print:

[ 'a' ] [ 'a', 'b' ] [ 'a', 'b', 'c' ] [ 'b', 'c', 'd' ] [ 'c', 'd', 'e' ] ... [ 's', 't', 'u' ] 

The bufferCount won't do what you want. It'll emit only when each buffer is exactly === 3 which means you won't get any emission until you post at least 3 messages.

Jan 2019: Updated for RxJS 6

Sign up to request clarification or add additional context in comments.

8 Comments

Upvote for correct answer. I have a bit another with low case
Thank you! It does exactly what I wanted to achieve
Won't the accumulated array just keep growing though? Seems like a memory leak.
This is a great answer, but a more succinct and functional version could use scan((acc, val) => [...acc, val].slice(-3), [])
@adam0101 no, it won't. The accumulator array will never grow past size 4. What you return from the scan function is what gets passed as the acc parameter on the next run. So for a run when it's already size 3, it will .push(val) making it size 4, then it will get cut down to size 3 again via .slice(-3). It's not leaking.
|
5

You can look at Observable#bufferCount function. One difference is that it wants at least 3 times to emit (first parameter, in this example).

const source = Rx.Observable.interval(1000); const example = source.bufferCount(3,1) const subscribe = example.subscribe(val => console.log(val));
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

3 Comments

Thank you for the answer! Just like @martin has mentioned, bufferCount emits only when each buffer is exactly === 3
Thanks. I have also mentioned it
You can do source.next(null); source.next(null); source.next(null); [sic]

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.