7

How can I use async/await inside the Observable?? With this code I'm unable to trigger the unsubscribe function within observable thus interval is not cleared.

const { Observable } = require("rxjs"); const test = () => new Observable(async (subscriber) => { await Promise.resolve(); const a = setInterval(() => { subscriber.next(Math.random()); console.log("zz"); }, 500); return () => { console.log("asdsad"); clearInterval(a); }; }); const xyz = test().subscribe(console.log); setTimeout(() => { xyz.unsubscribe(); }, 3000); 
1
  • 1
    Observable.fromPromise should turn the result of an async function into an observable Commented Jun 29, 2019 at 7:05

2 Answers 2

5

Async/Await inside an observable is not supported. However, it can be done with a behavior subject and an asynchronous nested function.

Create a behavior subject, convert it to an observable (.asObservable()), execute the asynchronous nested function, return the observable. Here's an example.

function getProgress() { // Change this value with latest details const value = new BehaviorSubject('10%'); const observable = value.asObservable(); // Create an async function const observer = async() => { // Perform all tasks in here const wait1 = await new Promise(resolve => setTimeout(resolve, 3000)); value.next('66%'); const wait2 = await new Promise(resolve => setTimeout(resolve, 3000)); value.next('100%'); // Complete observable value.complete(); } // Call async function & return observable observer(); return observable; } 

It's very readable and works like a charm.

Sign up to request clarification or add additional context in comments.

Comments

2

First of all, subscriber passed to observable contructor cannot be async function. There is no support for that.

If you need to create observable from promise, use from:

import { from } from 'rxjs'; const observable = from(promise); 

But considering your scenario.

Because there is no way to cancel native js promise, you cannot realy unsubscribe from such created observable, so:

const obs = from(new Promise(resolve => { setTimeout(() => { console.log('gonna resolve'); resolve('foo'); }, 1000); })); const sub = obs.subscribe(console.log); setTimeout(() => sub.unsubscribe(), 500); 

will print:

gonna resolve gonna resolve gonna resolve (...) 

so yeah: gonna resolve will be printed in the cosole all the time, but nothing more - result passed to resolve will be ignored - just not logged.

From the other hand, if you remove that unsubscribtion (setTimeout(() => sub.unsubscribe(), 500);) this time you will see:

gonna resolve foo gonna resolve gonna resolve gonna resolve (...) 

There is one way that maybe will help you - defer - but it's not strictly related with your question.

import { defer } from 'rxjs'; defer(async () => { const a = await Promise.resolve(1); const b = a + await Promise.resolve(2); return a + b + await Promise.resolve(3); }).subscribe(x => console.log(x)) // logs 7 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.