Skip to content

Unexpected behavior with async/await on IObservable #16

@slorion

Description

@slorion

In the code below, I would expect the result to be always 1, but when using async/await keywords with IObservable (in function AsyncWithObservable), it somehow ignores the first value produced and prints 2. Furthermore, if I remove the 100ms delay in ConnectAndProduceFirstValue(), then AsyncWithTaskRun() and WithTaskRun() never return. I am not entirely sure if those results are to be expected, but that behavior is surprising.

using System; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { static void Main(string[] args) { var s = new Subject<int>(); Task<int> t; for (int i = 0; i < 1000; i++) { t = AsyncWithObservable(s); s.OnNext(2); t.Wait(); Console.WriteLine("AsyncWithObservable: " + t.Result); // prints 2 t = AsyncWithTask(s); s.OnNext(2); t.Wait(); Console.WriteLine("AsyncWithTask: " + t.Result); // prints 1 t = AsyncWithTaskRun(s); s.OnNext(2); t.Wait(); Console.WriteLine("AsyncWithTaskRun: " + t.Result); // prints 1, blocks if Thread.Sleep is removed t = WithTask(s); s.OnNext(2); t.Wait(); Console.WriteLine("WithTask: " + t.Result); // prints 1 t = WithTaskRun(s); s.OnNext(2); t.Wait(); Console.WriteLine("WithTaskRun: " + t.Result); // prints 1, blocks if Thread.Sleep is removed } Console.ReadLine(); } static void ConnectAndProduceFirstValue(Subject<int> s) { Thread.Sleep(100); s.OnNext(1); } static async Task<int> AsyncWithObservable(Subject<int> s) { var t = s.FirstAsync(); ConnectAndProduceFirstValue(s); return await t; } static async Task<int> AsyncWithTask(Subject<int> s) { var t = s.FirstAsync().ToTask(); ConnectAndProduceFirstValue(s); return await t; } static async Task<int> AsyncWithTaskRun(Subject<int> s) { var t = Task.Run(() => s.First()); ConnectAndProduceFirstValue(s); return await t; } static Task<int> WithTask(Subject<int> s) { var t = s.FirstAsync().ToTask(); ConnectAndProduceFirstValue(s); return t; } static Task<int> WithTaskRun(Subject<int> s) { var t = Task.Run(() => s.First()); ConnectAndProduceFirstValue(s); return t; } } }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions