8

How can I make the following observable repeat until stream.DataAvailable is false? Currently it looks like it never stops.

AsyncReadChunk and Observable.Return inside the Defer section make OnNext call then OnCompleted call. When Repeat receives the OnNext call it passes it to TakeWhile. When TakeWhile's is not satisfied it completes the observable but I think the OnCompleted that comes right after the OnNext is so fast that it makes Repeat to re-subscribes to the observable and causes the infinite loop.

How can I correct this behaviour?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) { return Observable.Defer(() => { try { return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]); } catch (Exception) { return Observable.Return(new byte[0]); } }) .Repeat() .TakeWhile((dataChunk, index) => dataChunk.Length > 0); } 
2
  • 4
    Well done for figuring out how to solve your problem, and thanks for sharing your solution. However, could you please post the solution to your question as an answer rather than editing your question? Commented Nov 27, 2011 at 9:46
  • Samet, I've moved your self-answer out of the question and into a separate answer, marked as community wiki. Commented Nov 29, 2011 at 0:35

1 Answer 1

3

SELF ANSWER: (Below is an answer posted by Samet, the author of the question. However, he posted the answer as part of the question. I'm moving it into a separate answer, marking as community wiki, since the author hasn't moved it himself.)


I discovered by refactoring that it is a problem with schedulers. The Return uses Immediate scheduler while Repeat uses CurrentThread. The fixed code is below.

 public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) { return Observable.Defer(() => { try { return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread); } catch (Exception) { return Observable.Return(new byte[0], Scheduler.CurrentThread); } }) .Repeat() .TakeWhile((dataChunk, index) => dataChunk.Length > 0); } 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.