11

I'm in a situation where I have a list of Tasks that I'm working through (enable drive, change position, wait for stop, disable).

The 'wait for' monitors an IObservable<Status>, which I want to wait on (so I can thread it through ContinueWith and the other tasks).

I started out with the following tasks inside the OnNext handling of the subscriber, but that was just ugly. What I've now come up with is this extension method:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred) { var tcs = new TaskCompletionSource<T>(); source .Where(pred) .DistinctUntilChanged() .Take(1) //OnCompletes the observable, subscription will self-dispose .Subscribe(val => tcs.TrySetResult(val), ex => tcs.TrySetException(ex), () => tcs.TrySetCanceled()); return tcs.Task; } 

(UPDATED with svick's suggestion of handling OnCompleted and OnError)

Questions:

  • Is this good, bad, or ugly?
  • Did I miss an existing extension which could have done this?
  • Are the Where and DistinctUntilChanged in the right order? (I think they are)
2
  • Shouldn't you also handle the cases when the source errors out or completes? Commented Jul 30, 2012 at 8:34
  • @svick, hm, good question. In my particular use case, I'm observing a Repeat, so I don't think that will complete. But it's true that if I want this extension to be reusable, I should handle those cases. Commented Jul 30, 2012 at 8:51

2 Answers 2

13

At the very least I would change this extension method to be this:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred) { return source .Where(pred) .DistinctUntilChanged() .Take(1) .ToTask(); } 

Using .ToTask() is much better than to introduce TaskCompletionSource. You require a reference to the System.Reactive.Threading.Tasks namespace to get the .ToTask() extension method.

Also, DistinctUntilChanged is redundant in this code. You only ever get one value so it must be distinct by default.

Now, my next suggestion may be a little controversial. This extension is a bad idea because it hides the true semantics of what is going on.

If I had these two snippits of code:

var t = xs.WaitFor(x => x > 10); 

Or:

var t = xs.Where(x => x > 10).Take(1).ToTask(); 

I would generally prefer the second snippit as it clearly shows me what is going on - I don't need to remember the semantics of WaitFor.

Unless you made the name of WaitFor more descriptive - perhaps TakeOneAsTaskWhere - then you are taking the clarity of using the operators out of the code that uses it and making the code harder to manage.

Doesn't the following make it easier to remember the semantics?

var t = xs.TakeOneAsTaskWhere(x => x > 10); 

The bottom-line for me is that Rx operators are meant to be composed, not encapsulated, but if you're going to encapsulate them then their meaning must be clear.

I hope this helps.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for this. I'd seen ToTask in various places on the web, but (lazily) assumed that it was lost in some past or previous version. I think you're right that with ToTask the code becomes less ugly, and thus the need for the extension method is reduced.
I think encapsulation is fine where you're applying a set of compositions that have a higher meaning when put together. But I don't think there's enough "higher meaning" here to be worth the encapsulation - the encapsulated name has to mean more than the set of compositions. If you replace the xs.Where(...).Take(1).ToTask() with xs.FirstAsync(...).ToTask(), that's pretty succinct and clear of purpose.
3

Not 100% sure about this, but from reading the Rx 2.0 beta blog post, I would think that if you can use async/await, you could "return await source.FirstAsync(pred)" or without async, "return source.FirstAsync(pred).ToTask()"

http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx

sshot of linqpad using rx 2.0 and firstasync

1 Comment

I'll bear this in mind when/if I upgrade. Currently I'm trying to see just how hard it is without async/await..

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.