3

I'm learning rx in .NET and i've got the following requirements:

  • A sequence of strings is coming from an API. They are coming in different time intervals which I don't know. Sometimes there are coming 5 strings within a second, sometimes there is only coming 1 string within 5 seconds.
  • The strings are basically five commands: "start", "stop", "left", "right", "back". There are other commands incoming, but they can be filtered out.
  • The program should now execute whenever a command comes in.
  • If the same command is incoming within a given period of time (let's say 2 seconds), the command should only be executed once. If another command is incoming within this period, it should be executed immediatly. If the same command like the previous executed one is incoming after the given period of time, it should be executed.
  • There is no timestamp assigned with the incoming data (but this can be done if needed).

So given the example data:

using System; using System.Reactive.Linq; namespace ConsoleApp1 { class Program { static void Main(string[] args) { var results = new[] { "right", //1 "left", //2 "right", //3 "right", //4 "left", //5 "right", //6 "right", //7 "right", //8 "left" //9 }; var observable = Observable.Generate(0, x => x < results.Length, x => x + 1, x => results[x], x => TimeSpan.FromSeconds(1)).Timestamp(); observable.Subscribe(...); Console.ReadKey(); } } } 

The result should be:

right //1 left //2 right //3 left //5 right //6 right //8 left //9 

String 4 has been skipped because its only 1 second to the last "right", so has string 7. However, string 8 has not been skipped cause there are 2 seconds to string 6.

Possible solutions:

I tried to use a windowing function to skip entries, but this will skipp all strings even if they aren't the same value:

observable.Publish(ps => ps.Window(() => ps.Delay(TimeSpan.FromSeconds(2))) .SelectMany(x => x.Take(1))).Subscribe(f => Console.WriteLine(f.Value)); 

I also tried to add timestamps to each value and check them in a DistinctUntilChanged() EqualityComparer, but this seems also not to work as expected.

3
  • You can use .Scan to record the last value and timestamp, and compare each value with that last value. So for example you could maintain a Tuple of time+value for both the last value and the current value. I don't have time to write a full answer, but that will work. Commented Aug 28, 2018 at 9:15
  • If you have right, left, right, left all .1 seconds apart, should all 4 messages get emitted, or just 2? Commented Aug 28, 2018 at 14:48
  • all should be emitted Commented Aug 28, 2018 at 15:13

3 Answers 3

2

This is trickier than I thought because of the triple case (right, right, right all one second apart). Using a straight .Zip won't work here.

This is similar to Sentinel's answer, and correctly handles the triple case:

source .Timestamp() .Scan((state, item) => state == null || item.Timestamp - state.Timestamp > TimeSpan.FromSeconds(2) || state.Value != item.Value ? item : state ) .DistinctUntilChanged() .Select(t => t.Value); 

Explanation:

  • .Timestamp() wraps each message with the timestamp it arrives in
  • .Scan(1 arg) if a duplicate-within-2-seconds comes, then it repeats the previous message, otherwise emit new message
  • .DistinctUntilChanged() strips out duplicate messages, which will occur because .Scan is emitting the old message twice
  • .Select removes the timestamp.
Sign up to request clarification or add additional context in comments.

2 Comments

Worked like a charm so far, thank you! Didn't know about that handy .Timestamp() function. :)
If you haven't already, I suggest running through introtorx.com. It is a fantastic resource, written for Rx 1.0. About 98% of it is still current and applicable.
2

I have not tested this code, but you get the general idea.

 source .Select(x => (str: x, timestamp: DateTime.Now)) .Scan((last: (str: "", timestamp: DateTime.MinValue), next: (str: "", timestamp: DateTime.MinValue)), (state, val) => (last: (str: state.next.str, timestamp: state.next.timestamp), next: (str: val.str, timestamp: val.timestamp)) ) .Where(x => (x.next.str != x.last.str) || (x.next.timestamp - x.last.timestamp) > TimeSpan.FromSeconds(2)) .Select(x=>x.next.str); 

3 Comments

This doesn't emit item 8. in the question.
@Shlomo I didn't test it, but it should do I think. Why does it not, I must be missing something.
If you tested it you would find out it didn't. :) It doesn't because in the case of three consecutive rights, the second gets filtered out by the where, but still updates the state in the scan. So when the third arrives, you're comparing the 2nd right against the third right, which are only one second apart.
0

Hm.. sounds something like observable.DistinctUntilChanged to detect distinct events, but merged via CombineLatest with observable.Debounce to also get the repeated-after-some-time....

That would cover the basics, but I'm not sure what would happend if a different-than-previous item came after time-longer-than-debounce.. both source DistinctUntilChanged and Debounce operators would pass the item "at the same time" and I'm not sure what would CombineLatest do at this point. There's a change you will get such event twice (the same event in a very short period of time), so you'd need to deduplicate it out once more.

If you are willing to add some kind of timestamps, then there's also pretty explicit way to do it, although I'm not sure if it's really easier..

  • take source events - stream of {command}
  • GroupBy the type of the item - stream of substream of {command}, each substream contains ONLY one type of command
  • apply TimeInterval to each of those sub-streams, you get stream of substream of {command, time-since-last-seen}, each substream contains ONLY one type of command
  • CombineLatest them all, you get stream of {command, time-since-THIS-TYPE-was-last-seen}
  • transform it into {command, null-or-ObjectThatIsAlwaysDifferent} depending on the 'time', if time is less than the hold-off period then NULL, if time is larger than hold-off, then use some magic value that IsAlwaysDifferent
  • DistinctUntilChanged that

You should be able to implement the magic ObjectThatIsAlwaysDifferent by simply making a class whole gethashcode returns 0 and equals returns false.

This should return events either that have different Command than previous one, or that are same as before but occurred after a time longer than hold-off.

Now thinking about it, it should be possible to do it very simple by ZIPping current and previous value:

  • take source stream of {command}
  • add timestamps - {command, timestamp}
  • delay it by 1, remember both source and delayed
  • zip them together, now you get stream of [{command, timestamp}, {prevcommand, prevtimestamp}]
  • filter it with your code:
    • pass when command != prevcommand
    • pass when command == prevcommand && timestamp-prevtimestamp > holdoff

aand that should be it. As usual, more than one way to do the same thing.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.