2

I'm attempting to create an IObservable<T> from two arrays (IEnumerables). I'm trying to avoid explicitly iterating over the arrays and calling observer.OnNext. I came across the Observable.Subscribe extension method, which at first glance would appear to be what I need. However, it does not work as I expected to and I'm at a loss as to why.

The following code is an example:

 class Program { static void Main(string[] args) { var observable = Observable.Create<char>(observer => { var firstBytes = new[] {'A'}; var secondBytes = new[] {'Z', 'Y'}; firstBytes.Subscribe(observer); secondBytes.Subscribe(observer); return Disposable.Empty; } ); observable.Subscribe(b => Console.Write(b)); } } 

The output of this is "AZ", not "AZY" as I expected. Now, if I subscribe to secondBytes before firstBytes, the output is "ZAY"! This seems to suggest it is enumerating the two arrays in-step - which kind of explains the "AZ" output.

Anyhow, I'm at a complete loss as to why it behaves like this and would appreciate any insight people may be able to provide.

5
  • Still trying to figure out why this behaves the way it does, but in case you just need a fix: return firstBytes.Concat(secondBytes).Subscribe(observer);. Commented Oct 16, 2016 at 21:29
  • Just a small hint - if ever you find yourself returning Disposable.Empty then you are doing something wrong. Commented Oct 16, 2016 at 23:16
  • @Enigmativity - thanks for the advice - in this case, it's more a by-product of the simplicity of the example (or is there a more suitable return). Commented Oct 17, 2016 at 8:25
  • @cristobalito - Yes, you would return new CompositeDisposable(firstBytes.Subscribe(observer), secondBytes.Subscribe(observer)). Commented Oct 17, 2016 at 12:17
  • @Enigmativity nice one - thanks Commented Oct 18, 2016 at 15:16

2 Answers 2

2

The reason for the lock-step iteration behaviour can be explained by the implementation of Observable.Subscribe(IEnumerable source) which uses a "recursive" algorithm which works by calling e.MoveNext in a scheduler action. If it is successful then the value is emitted and and the a new scheduler action is then queued to read the next value from the enumerable.

As you are subscribing to two enumerables and are not specifying any particular scheduler for the subscription, the default iteration scheduler will be used for these operations (defined by SchedulerDefaults.Iteration) which defaults to running on the current thread. This means that the enumeration actions will be queued up to run after your current subscription action completes. This causes the enumeration actions to be interleaved - something like this

  1. firstBytes.Subscribe() -> queue enumerate action
  2. secondBytes.Subscribe() -> queue enumerate action
  3. call firstBytes.MoveNext() -> OnNext("A") -> queue next enumeration action
  4. call secondBytes.MoveNext() -> OnNext("Z") -> queue next enumeration action
  5. call firstBytes.MoveNext() -> OnCompleted()
  6. call secondBytes.MoveNext() -> OnNext(Y) -> queue next enumeration action
  7. call secondBytes.MoveNext() -> OnCompleted()

The observer receives the OnCompleted() notification at step 5 so the remaining secondBytes enumeration steps are ignored. If you had returned your subscription disposables then the second enumeration would have been cancelled at that point instead.

Sign up to request clarification or add additional context in comments.

1 Comment

As it appears to have been missed in an answer below, but I think your question implies it, all you need is firstBytes.Concat(secondBytes). While this will just give you an IEnumerable<char> you can still just subscribe to IEnumerables. If you really want, you can whack .ToObservable() on the end of it. But the Observable.Create in the OP is not necessary.
2

Because you are subscribing to two observables, as opposed to a single observable that is the concatenation of two observables, there are two possible sources that can invoke the observer's OnComplete method. Since the first array is shorter, it completes after the first item is emitted, and the observer unsubscribes since it has received a completion notification.

The correct way to do this is to combine the two sequences into a single sequence, then subscribe to that:

var observable = Observable.Create<char>(observer => { var firstBytes = new[] { 'A' }; var secondBytes = new[] { 'Z', 'Y' }; return firstBytes.Concat(secondBytes).Subscribe(observer); }); observable.Subscribe(Console.Write); 

7 Comments

Makes sense - could have sworn I tried the Concat approach (in my more complicated use case). Thanks for the help.
Although - that Concat is an IEnumerable.Concat. In order to stay in IObservable land, it should probably be firstBytes.ToObservable().Concat(secondBytes.ToObservable())
@cristobalito It doesn't really make any difference functionally, at least in this case.
mmm, I wouldn't say "correct way to do it" The whole example in the OP is bogus and is Rx for Rx sake. It would be great to know why the OP wants to use Rx when the source is IEnumerable<T>?
Hi Lee - agree example is bogus but it was a simplified example compared to the problem I was trying to solve. In any case, I think you are correct and trying to shoe-horn the solution into Rx is not the way forward.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.