0

I try to access a stream every 2 seconds and get the first value within this time. Example:

Values: -1--2-3---4-----5--6-7--8 Result: -1-------3--------5------ 

I tried the code like int this thread (quite similar problem):

subject.AsObservable().Window(TimeSpan.FromSeconds(2)) .SelectMany(f => f.Take(1)) .Subscribe(f => { Console.WriteLine("Counter: " + counter + " Time:" + DateTime.Now.Millisecond); counter++; }); 

However, the 2 seconds do not work, the counter updates too fast even after 200 milliseconds.

What am I missing?

Data are added to the subject by using this code:

Task.Run(async () => { while (await call.ResponseStream.MoveNext(default(CancellationToken))) { foreach (var result in call.ResponseStream.Current.Results) { subject.OnNext(result); } } }); 
2
  • Could you provide a minimal reproducible example so that your code can be tested? Commented Aug 17, 2018 at 14:27
  • And what's the point of using DateTime.Now.Millisecond? Commented Aug 17, 2018 at 14:29

1 Answer 1

1

Your query seems to be perfectly fine. Let's test this by giving it a good set of source data:

var rnd = new Random(); var source = Observable .Generate(0, x => true, x => x + 1, x => x, x => TimeSpan.FromSeconds(rnd.NextDouble() / 10.0)); 

This is going to produce a value every 0.0 to 100.0 milliseconds. So if the query is correct we should expect to see a value produced within 100 milliseconds of each 2,0 second window (give or take Windows OS timing issues).

Here's a slightly better version of the subscription of the query:

var sw = Stopwatch.StartNew(); source .Window(TimeSpan.FromSeconds(2.0)) .SelectMany(f => f.Take(1)) .Subscribe(f => { Console.WriteLine($"Counter: {f} Time: {sw.Elapsed.TotalMilliseconds}"); }); 

These results I get are like:

 Counter: 0 Time: 110.8073 Counter: 33 Time: 2124.7605 Counter: 67 Time: 4061.8636 Counter: 101 Time: 6061.1922 Counter: 134 Time: 8090.158 Counter: 169 Time: 10173.0396 Counter: 197 Time: 12153.0229 Counter: 233 Time: 14138.7718 Counter: 265 Time: 16144.8861 Counter: 296 Time: 18122.042 Counter: 337 Time: 20141.1837 Counter: 373 Time: 22115.0944 Counter: 410 Time: 24162.0706 

It has a bit of timer drift, but it's following the expected pattern. The query works.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for your answer. Can you explain me why - even if I don't use any TPL classes, the threads switch during the windowing? If I add Trace.WriteLine($"Counter: {f} Time: {sw.Elapsed.TotalMilliseconds}" + " on Thread: " +Thread.CurrentThread.ManagedThreadId); The thread changes, but this should not be the case i think..?
@TobiasvonFalkenhayn - The query is using Scheduler.Default which uses threads from the thread-pool. Rx's usual behaviour is to let threads return to the pool if there is a gap between the values being produced - this is to prevent thread-pool starvation. So because this query has gaps it uses different threads. If you want to prevent that you could introduce an EventLoopScheduler to ensure only one thread is used.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.