6

I have an IObservable<Item> inside a class and I want to expose a read-only property that provides the last item pushed to the observable at a given time. So it will provide a single value of Item.

If no value has been pushed, then it will have to return a default value.

How can I do this without having to subscribe to the observable and having a "backing field"?

2
  • What do you mean by the "last item pushed to the observable"? The observable's current value? Commented Feb 4, 2017 at 16:15
  • 1
    Yes! If the observable sequence is, {1, 4, 6,... 400 }, reading the property will return 400 Commented Feb 4, 2017 at 16:21

3 Answers 3

6

Just to supplement @Asti's answer a bit here, and perhaps help you with your frustration:

An observable isn't a physical 'thing', it's more a logical concept. Rx is often compared to LINQ, and it's a fair comparison much of the time. It breaks down though when you start talking data structures: LINQ's enumerables are similar enough to Lists for learning purposes.

However, on the Rx side, there's simply no good equivalent to List. An observable is a transient data structure, all operators deal with this transient state. If you're looking for a permanent state, you're leaving Rx.

Having said that, converting an observable to some sort of state is a common problem, and there are some packages that may help you: ReactiveUI is perhaps the most known. ReactiveProperty is another. Both of these packages are flawed, but may help you.

If you're simply looking for an easier way to get a backing field, without boiler-plating out a backing field, this will work:

public static class ReactivePropertyExtensions { public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source) { return new ReactiveProperty<T>(source); } public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue) { return new ReactiveProperty<T>(source, defaultValue); } } public class ReactiveProperty<T> : IDisposable { private IObservable<T> Source { get; } private IDisposable Subscription { get; } public T Value { get; private set; } public ReactiveProperty(IObservable<T> source) : this(source, default(T)) { } public ReactiveProperty(IObservable<T> source, T defaultValue) { Value = defaultValue; Source = source; Subscription = source.Subscribe(t => Value = t); } public void Dispose() { Subscription.Dispose(); } } 

Example use:

var ticker = Observable.Interval(TimeSpan.FromSeconds(1)) .Publish().RefCount(); var latestTickerValue = ticker.ToReactiveProperty(); Console.WriteLine(latestTickerValue.Value); await Task.Delay(TimeSpan.FromSeconds(1)); Console.WriteLine(latestTickerValue.Value); await Task.Delay(TimeSpan.FromSeconds(3)); Console.WriteLine(latestTickerValue.Value); 
Sign up to request clarification or add additional context in comments.

4 Comments

After some time, I'm re-reading your answer. It's great, but there's a statement you say that I don't understand: Talking about ReactiveUI and ReactiveProperty => "Both of these packages are flawed, but may help you". What do you mean? why do you think they're flawed? Thanks in advance!
This is all opinion, so not sure if this is the right forum: ReactiveProperty has a huge number of dependencies because he crams support for all platforms in one package. So if you just want one function or class, you're introducing a good deal of bloat to your project.
ReactiveUI seems to be in a constant state of flux with the occasional breaking changes and not enough solid documentation. The boilerplate required for Reactive Properties is also awkward. That one's the real pity, if you asked me: It's so close to being fantastic, but always seems two steps away.
Talking about boilerplate in ReactiveUI, I highly recommend you this Fody extension: github.com/kswoll/ReactiveUI.Fody
3

Assuming a hot observable.

For observable = source.Replay(1); observable.Connect();

Provide the value with:

public int Value => observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();

This will return a default value in case no values have been pushed.

You want a transition from Reactive to state, so a backing field isn't a terrible option. You mentioned that you don't want to subscribe, but to observe anything: something, somewhere has to subscribe.

5 Comments

OK, but that is async. My property shouldn't be async. Also, LastAsync it returns the last element when the sequence has finished, but I don't want to return the instant last value pushed.
@SuperJMN Do you want to have the last value pushed to the observable while the observable is running without having a backing field? It has nothing to do with the observable completing?
Yes, exactly that!
OK, I must say I don't like the Wait at all, but I think there's no alternative. I finally chose to have a backing field that is set in a subscription: source.Subscribe(v => property = v)
Oh, and it'll never have actually to wait because the Return method or the replay method activates on subscribe.
1

Here is another way to define the Value property, in the spirit of Asti's solution.

private readonly IObservable<Item> _source; private readonly IObservable<Item> _lastValue; public SomeClass() // Constructor { _source = /* Initialize the source observable (hot) */ _lastValue = _source .Catch(Observable.Never<Item>()) .Concat(Observable.Never<Item>()) .Publish(default) .AutoConnect(0) .FirstAsync(); } public Item Value => _lastValue.Wait(); 

The Publish operator that accepts an initialValue parameter...

Returns a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. This operator is a specialization of Multicast using a BehaviorSubject<T>.

The BehaviorSubject<T> is a specialized ISubject<T> that...

Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.

The Catch and Concat operators have been added in order to preserve the last value, even in case that the source sequence completes normally or exceptionally.

Personally I would be hesitant to use this solution, since a volatile field updated in the Do operator would accomplish the same thing more naturally. I am posting it mainly as a demonstration of the Rx capabilities.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.