6

This is a rather educational, out of curiosity question. Consider the following snippet:

var enumerable = Enumerable.Range(0, 5); var observable = enumerable.ToObservable(); var enu = observable.Concat(observable).ToEnumerable(); enu.ToObservable().SubscribeDebug(); 

Where SubscribeDebug subscribes a simple observer:

public class DebugObserver<T> : IObserver<T> { public void OnCompleted() { Debug.WriteLine("Completed"); } public void OnError(Exception error) { Debug.WriteLine("Error"); } public void OnNext(T value) { Debug.WriteLine("Value: {0}", value); } } 

The output of this is:

Value: 0

Value: 1

Value: 2

Value: 3

Value: 4

And then blocks. Can someone help me understand the underlying reason why it happens and why the observable does not complete? I have noticed that it does complete without the Concat call, but blocks with it.

5
  • Does this behavior also exists when you concat a different observable which is already completed? Commented Apr 5, 2020 at 11:12
  • 1
    Your code is creating a deadlock due to the scheduler used. Try this instead: .ToObservable(Scheduler.Default). That works with your code. I'll need to spend more time on it to give you the reason why. Commented Apr 5, 2020 at 11:26
  • 1
    @Progman - You're on the wrong track. The each subscription to enumerable.ToObservable() starts the enumerable again. Like to foreach calls over an enumerable start the enumerable again. The problem here is a deadlock caused by the Scheduler.Immediate scheduler. Commented Apr 5, 2020 at 11:29
  • 3
    The problem does not seem to be with the Scheduler.Immediate, because when I pass it to ToObservable(), both enumerations are iterated. When called without any scheduler implementation however, the code blocks. Commented Apr 5, 2020 at 12:06
  • 2
    @OguzOzgul it only deadlocks with Scheduler.CurrentThread from all static schedulers. So this is the default I guess (when ToObservable is called without argument). Commented Apr 6, 2020 at 11:29

1 Answer 1

9

I've looked at the source of ToObservable and distilled a minimal implementation. It does reproduce the behavior we're seeing.

 public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) => ToObservableEx(enumerable, CurrentThreadScheduler.Instance); public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) => Observable.Create<T> ( observer => { IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator) { if (enumerator.MoveNext()) { observer.OnNext(enumerator.Current); inner.Schedule(enumerator, loopRec); //<-- culprit } else { observer.OnCompleted(); } // ToObservable.cs Line 117 // We never allow the scheduled work to be cancelled. return Disposable.Empty; } return scheduler.Schedule(enumerable.GetEnumerator(), loopRec); } ); 

With that out of the way - the crux of the problem lies in the behavior of CurrentThreadScheduler, which is the default scheduler used.

The behavior of CurrentThreadScheduler is that if a schedule is already running while Schedule is being called - it ends up being queued.

 CurrentThreadScheduler.Instance.Schedule(() => { CurrentThreadScheduler.Instance.Schedule(() => Console.WriteLine(1) ); Console.WriteLine(2); }); 

This prints 2 1. This queuing behavior is our undoing.

When observer.OnCompleted() is called, it causes Concat to start the next enumeration - however, things are not the same as when we started out - we are still inside the observer => { } block when we try to schedule the next one. So instead of executing immediately, the next schedule gets queued.

Now enumerator.MoveNext() is caught in a dead-lock. It can't move to the next item - MoveNext is blocking until the next item arrives - which can only arrive when scheduled by the ToObservable loop.

But the Scheduler can only work to notify ToEnumerable and subsequently MoveNext() which is being held up - once it exits loopRec - which it can't because it's being blocked by MoveNext in the first place.

Addendum

This is approximately what ToEnumerable (from GetEnumerator.cs) does (not a valid implementation):

 public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable) { var gate = new SemaphoreSlim(0); var queue = new ConcurrentQueue<T>(); using(observable.Subscribe( value => { queue.Enqueue(value); gate.Release(); }, () => gate.Release())) while (true) { gate.Wait(); //this is where it blocks if (queue.TryDequeue(out var current)) yield return current; else break; } } 

Enumerables are expected to be blocking until the next item is yielded - and that's why there's a gating implementation. It's not Enumerable.Range which blocks, but ToEnumerable.

Sign up to request clarification or add additional context in comments.

11 Comments

But I implemented a custom IEnumerable<int> and returned a custom IEnumerator<int> and what I see is, when the iteration over the first enumerator completes, the GetEnumerator() is invoked again and (a new one is returned by me), but the MoveNext() is never called.
I should clarify - it's not your own IEnumerable that matters - it's the one returned by Observable.ToEnumerable(). That's where it blocks.
@TheodorZoulias Thank you! The problem became so much clearer once I implemented ToObservable - Rx stacktraces are fairly unreadable. Writing up simplified Rx operators to understand their behavior is a good learning experience. :)
Incredible! Thank you for such a detailed answer!
Fabulous answer!
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.