1

I have a solution for receiving Observable notifications until specified count arrived or time threshold elapsed. Also, I need to know which one happened.

Wondering if there is a simpler way (maybe other than GroupByUntil) to achieve this functionality

_values. .GroupByUntil(_ => true, i => Observable.Timer(Threshold, _scheduler) .Amb(i.Buffer(SpecifiedCount).Select(_ => SpecifiedCount))) // this is for figuring out which one happened: interval elapsed or count filled .SelectMany(g => g.Count()) // Let's say if count filled first, call Foo() .Where( i => i == SpecifiedCount ) .Subscribe( _ => Foo() ) 
4
  • Both Buffer and Window accept either count or TimeSpan, or both. Have you tried them? Commented Sep 23, 2015 at 9:18
  • I now have and they functionality differs from the GroupByUntil solution. Trying to figure out why. Commented Sep 25, 2015 at 9:11
  • What functionality do you want? What is different? Commented Sep 25, 2015 at 9:55
  • @PanagiotisKanavos after testing with trivial Observables, I now believe the differing functionality comes from my test environment and/or misusage of TestScheduler. I will close this question Commented Sep 25, 2015 at 10:45

1 Answer 1

3

Are you looking for Observable.Buffer Method (IObservable, TimeSpan, Int32)? According to the docs

Indicates each element of an observable sequence into a buffer that’s sent out when either it’s full or a given amount of time has elapsed.

You should be able to write:

var myObservable=_values.Buffer(Threshold,SpecifiedCount); 

There's a similar overload for Window as well.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks! This is pretty much what I want. There's a slight alteration to GroupByUntil functionality in some cases. Sometimes the first notification does not get buffered and the first buffer actually takes SpecifiedCount + 1 of notifications to send out. Can you think of a reason why this would happen?
Btw, this happens while running it on TestScheduler, using the overload Buffer(IObservable, TimeSpan, int, IScheduler )

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.