5

In a C# console application, using System.Reactive.Linq, I'm trying to make an observable, where each item is the string result of some processing by another observable. I've created a simple repro using strings and characters. Warning, this example is completely CONTRIVED, and the point is that the nested .Wait() hangs.

class Program { static void Main(string[] args) { string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" }; IObservable<string> files = fileNames.ToObservable(); string[] extensions = files.Select(fn => { var extension = fn.ToObservable() .TakeLast(4) .ToArray() .Wait(); // <<<<<<<<<<<<< HANG HERE return new string(extension); }) .ToArray() .Wait(); } } 

Again, this is not how I would find the suffix of many filenames. The question is how I can produce an Observable of strings, where the strings are computed from a completed observable.

If I pull out this code and run it alone, it completes fine.

 var extension = fn.ToObservable() .TakeLast(4) .ToArray() .Wait(); 

There is something about the nested Wait() on the async methods, which I don't understand.

How can I code the nested async observables, so I can produce a simple array of string?

Thanks

-John

3

3 Answers 3

6

The reason why your code is blocking is because you are using ToObservable() without specifying a scheduler. In this case it will use the CurrentThreadScheduler.

So the files observable issues it's first OnNext() [A] (sending "file1.doxc") using the current thread. It can't continue iterating until that OnNext() returns. However, the inner fn observable also uses ToObservable() and the Wait() blocks until fn completes - it will queue the first OnNext() (sending "f") to the current thread scheduler but it will never be able to send it because now the first OnNext() [A] will never return.

Two simple fixes:

Either change the files observable like this:

IObservable<string> files = fileNames.ToObservable(NewThreadScheduler.Default); 

Or, avoid the use of the inner Wait() with a SelectMany (which is definitely more idiomatic Rx):

string[] extensions = files.SelectMany(fn => { return fn.ToObservable() .TakeLast(4) .ToArray() .Select(x => new string(x)); }) .ToArray() .Wait(); // display results etc. 

Each approach will have quite different execution semantics - the first will run much like a nested loop, with each inner observable completing before the next outer iteration. The second will be much more interleaved since the blocking behaviour of the Wait() is removed. If you use the Spy method I wrote and attach it after both ToObservable() calls, you'll see this behaviour quite clearly.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you James, great answer, to the point and informative. I didn't have a good reason for not using SelectMany, in fact that's my ultimate solution now. I tried adding the NewThreadScheduler.Default just for a test, and that worked as well of course. I get the concept that the observables are effectively deadlocked on the OnNext, but I think i'll play around with Spy a bit and try to make this clearer for myself. Thanks again!
1

Wait is a blocking calling, which doesn't mix well with Rx. I'm not sure why the nested one is failing.

Assuming an async function, this works:

IObservable<string> files = fileNames.ToObservable(); string[] extensions = await files.SelectMany(async fn => { var extension = await fn.ToObservable() .TakeLast(4) .ToArray(); return new string(extension); }) .ToArray(); 

Comments

1

James' has nailed the issue, but I would suggest that your code boils done to just doing this:

 string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" }; string[] extensions = ( from fn in fileNames.ToObservable() from extension in fn.ToObservable().TakeLast(4).ToArray() select new string(extension) ) .ToArray() .Wait(); 

Now, that still has a .Wait() in it. Ideally you'd do something like this:

 IDisposable subscription = ( from fn in fileNames.ToObservable() from extension in fn.ToObservable().TakeLast(4).ToArray() select new string(extension) ) .ToArray() .Subscribe(extensions => { /* Do something with the `extensions` */ }); 

You should avoid all waiting.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.