1

I got an observable of KeyValuePair<int,double>:

--(7|45.2)--(3|11.1)--(5|13.2)--(6|36.2)--(3|57.4) 

I got a list of consumers defined at runtime. They are only interested in values produced for a single key (myKey).

For example:

  • the consumer 7, is only interested in the value 45.2.
  • the consumer 3, is only interested in the values 11.1 and 57.4
  • the consumer 1, is only interested in values with myKey = 1, so none here.

Here is my consumer subscription code (one per consumer):

myObservable.Where(t => t.Key == myKey).Subscribe(t => /* DoSomething*/); 

Let's take:

  • N = number of messages being produced by myObservable
  • M = number of consumers

Let's call Comparison the code t.Key == myKey

For every new message being published, Comparison will be executed M times (once per consumer). In the case of N messages, Comparison will be executed N * M

Is RX Extension offering another way to do to avoid executing that many comparisons?

Do I need to make it myself? (using the dictionary of (mykey, consumers) for example, and forwarding the messages to the right consumer(s))

Thanks

7
  • 2
    You could avoid the dictionary with a GroupBy and each subscribers is only subscribing to the group their interested in. Commented Oct 23, 2018 at 22:23
  • Not sure how I could do :"each subscribers is only subscribing to the group their interested in"? Commented Oct 23, 2018 at 22:26
  • Maybe something along these lines source.GroupBy(kvp => kvp.Key).Select(grp => grp.Subscribe(kvp => /*each group do something*/)); Commented Oct 23, 2018 at 22:40
  • JSteward you are right, if I apply a .GroupBy(kvp => kvp.Key) on myObservable and the consumer do: myObservable.Where(gp => gp.Key == myKey).SelectMany(gp=>gp).Select(kvp=>kvp.Value).Subscribe(v => /* DoSomething*/) it will give a different result. The complexity of this code, if I take K = the number of myKey possible (myKey in the [1...K] interval), will now be at worst K * M times. As for any new myKey an observable will be created and pushed through every new consumer that will execute the comparison on it. Do you agree on it? Commented Oct 23, 2018 at 23:33
  • Yes, I think I'm following you. The comparison will happen once per group per subscriber. There's of course other measures of complexity for that code but this would seem to lessen the number of key comparisons. Commented Oct 23, 2018 at 23:58

2 Answers 2

1

So you want to optimize this setup:

IObservable<KeyValuePair<int, double>> myObservable; myObservable.Where(e => e.Key == 1).Select(e => e.Value).Subscribe(x => { /* DoSomething*/ }); myObservable.Where(e => e.Key == 3).Select(e => e.Value).Subscribe(x => { /* DoSomething*/ }); myObservable.Where(e => e.Key == 7).Select(e => e.Value).Subscribe(x => { /* DoSomething*/ }); 

This setup will indeed cause multiple comparisons for each element emitted by the myObservable. A simple way to optimize it is to group the myObservable with the GroupBy operator, in order to created a nested observable structure:

IObservable<IGroupedObservable<int, double>> myGroupedObservable = myObservable .GroupBy(e => e.Key, e => e.Value); 

...and then select the group that you want to observe, and Merge the nested structure to a flat sequence:

myGroupedObservable.Where(g => g.Key == 1).Merge().Subscribe(x => { /* DoSomething*/ }); myGroupedObservable.Where(g => g.Key == 3).Merge().Subscribe(x => { /* DoSomething*/ }); myGroupedObservable.Where(g => g.Key == 7).Merge().Subscribe(x => { /* DoSomething*/ }); 

This way the comparison will occur only once per key and consumer. You are now comparing IGroupedObservable<int, double> subsequences, not KeyValuePair<int, double> elements. These subsequences are emitted by the GroupBy operator only when it receives the first element that has a distinct key. So the total number of comparisons will be equal to NxM, where N is the number of consumers and M is the number of the distinct keys. Unless you are dealing with millions of consumers, this number should be fairly small, and should not cause noticeable overhead.

A more complex approach that is based on a combination of the GroupBy operator and a ConcurrentDictionary<TKey, Subject<TSource>>, can be found in the 5th revision of this answer. It is probably less efficient than the simple approach above, because there are two dictionaries involved (the GroupBy also uses a dictionary internally), so each key is hashed twice.

Sign up to request clarification or add additional context in comments.

3 Comments

@PiPo I updated the answer with a simpler approach.
Theodor, what you think of the implementation at the link below? It is a variant of your 5th implementation where we rely only on Rx GroupBy dictionary github.com/plop44/ObservableLookup/blob/main/ObservableLookup/…
@PiPo sorry, it's a long time since I've done anything Rx-related, so I'm not sharp enough for doing a code-review.
0

You could give this a go:

var subject = new Subject<KeyValuePair<int, double>>(); var interests = new Dictionary<int, Func<double, string>>() { { 7, v => $"Seven:{v}" }, { 3, v => $"Three:{v}" }, { 1, v => $"One:{v}" }, }; IDisposable subscription = subject .GroupBy(x => x.Key, x => x.Value) .Select(gx => interests.ContainsKey(gx.Key) ? gx.Select(x => interests[gx.Key](x)) : Observable.Empty<string>()) .Merge() .Subscribe(x => Console.WriteLine(x)); 

When I test with this code:

subject.OnNext(new KeyValuePair<int, double>(7, 45.2)); subject.OnNext(new KeyValuePair<int, double>(3, 11.1)); subject.OnNext(new KeyValuePair<int, double>(5, 13.2)); subject.OnNext(new KeyValuePair<int, double>(6, 36.2)); subject.OnNext(new KeyValuePair<int, double>(3, 57.4)); 

I get this output:

Seven:45.2 Three:11.1 Three:57.4 

The comparison is done once per key.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.