0

I have this working. On receipt of a socket message, a value is emitted immediately and every second thereafter (incrementing the age), until socket received again.

However, I want it to emit every second regardless of whether socket has been received. So it would start off emitting every seconds, but when socket received the properties would change to the new ones and they would be emitted every second.

Can't quite figure out what to do.

updated: Observable<TargetDevice>; this.updated = socketService.onMessage.pipe( filter( message => message.messageType === SocketIoMessageType.Device && message.data.id === this.id ), map((message: SocketIoMessage) => <Device>message.data), tap(d => this.setProps(d)), switchMap(d => timer(0, 1000).pipe( tap(tick => (this.age = d.age + tick)), map(() => this) ) ) ); 
1
  • So you want a starting value which does not originate from the socket and just starts your one second interval or am I missing something? Commented Dec 31, 2018 at 7:21

3 Answers 3

1

You'd want to use combineLatest and startWith to achieve your desired behaviour:

combineLatest( socketService.onMessage.pipe( startWith(DEFAULT_MESSAGE) ), timer(0, 1000) ).pipe( //... ) 
Sign up to request clarification or add additional context in comments.

Comments

0

One of possible solutions for you problem is to create beforeSocketMessage$ stream, and limit it with takeUntil operator.

const beforeSocketMessage$ = interval(1000).pipe(mapTo('beforeSocketMessage'), takeUntil(socketService.onMessage)); const message$ = socketService.onMessage.pipe( filter(/* you filter*/), map((message: SocketIoMessage) => <Device>message.data), tap(d => this.setProps(d)), switchMap((data) => interval(0, 1000).pipe(mapTo(data))), tap(tick => (this.age = d.age + tick)), mapTo(this) ) this.update = merge(beforeSocketMessage$, message$); 

Simplified version at stackblitz.com: https://stackblitz.com/edit/rxjs-jwg2cb?devtoolsheight=60

Comments

0

I think you want something like that. You were right with switchMap. The cool thing about it is that it does unsubscribe from the source observable after the next value has been emitted.

socketService.onMessage.pipe( switchMap(value => interval(1000).map(() => value)) ); 

Check it out on RxJs Playground here

1 Comment

It's actually a bit more complex than that, see edit.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.