I want write function which will return items asynchronously:
public static async Task<IEnumerable<T>> GetItems() { while (await ShouldLoopAsync()) { yield return await GetItemAsync(); } } Of course it will not work. In 1 they propose next solution:
return Observable.Create<T>( observer => Task.Run(async () => { while (await ShouldLoopAsync()) { observer.OnNext(await getAsync()); } observer.OnCompleted(); } ) ); I don't like accepted solution from 1 because it uses Task.Run method inside Observable.Create. I want some solution with reactive extensions which will run on the current TaskScheduler without using other threads except existing one. I can write such solution in declarative way using .net events but I can't with Rx.
UPDATE
I tried to remove Task.Run and write next test:
[TestClass] public class ReactiveWithTaskIEnumerable { [TestMethod] public void ReactibeShouldProcessValuesWithoutnakoplenie() { Trace.WriteLine($"Start Thread {Thread.CurrentThread.ManagedThreadId}"); var i = GetIntsAsync() .Subscribe( onNext: p => { Trace.WriteLine($"onNext Thread {Thread.CurrentThread.ManagedThreadId} {p}"); }, onCompleted: () => { Trace.WriteLine($"onCompleted Thread {Thread.CurrentThread.ManagedThreadId}"); }, onError: p => { Trace.WriteLine($"onError Thread {Thread.CurrentThread.ManagedThreadId} {p}"); throw p; } ); Trace.WriteLine($"Finish Thread {Thread.CurrentThread.ManagedThreadId}"); } private IObservable<int> GetIntsAsync() { return Observable.Create<int>( observer => async () => { Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}"); try { foreach (var i in Enumerable.Range(1, 10)) { await Task.Delay(1000); observer.OnNext(1); } } catch (Exception e) { observer.OnError(e); } observer.OnCompleted(); }); } } And the output is:
Start Thread 3 Finish Thread 3 So internal task doesn't start at all.
After that I changed GetIntsAsync to this:
private IObservable<int> GetIntsAsync() { return Observable.Create<int>( observer => { async Task Lambda() { Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}"); try { foreach (var i in Enumerable.Range(1, 10)) { await Task.Delay(1000); observer.OnNext(1); } } catch (Exception e) { observer.OnError(e); } observer.OnCompleted(); } return Lambda(); }); } And this gives next output:
Debug Trace: Start Thread 3 Produce 3 Finish Thread 3 It is close to output with Task.Run:
Debug Trace: Start Thread 3 Produce 9 Finish Thread 3
Now I need to wait until observable comes completion
Task.Runthat you don't want to be there. What's the problem?Observable.Createoverload that accepts anIScheduler, in the package System.Reactive 4.4.1.