5

Can someone explain why the following AsObservable method creates an infinite loop even though the end of stream is reached?

public static class StreamExt { public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) { return Observable .FromAsync(cancel => stream.ReadBytes(bufferSize, cancel)) .Repeat() .TakeWhile(bytes => bytes != null) // EndOfStream .SelectMany(bytes => bytes); } private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) { var buf = new byte[bufferSize]; var bytesRead = await stream .ReadAsync(buf, 0, bufferSize, cancel) .ConfigureAwait(false); if (bytesRead < 1) return null; // EndOfStream var result_size = Math.Min(bytesRead, bufferSize); Array.Resize(ref buf, result_size); return buf; } } 

A quick tests shows that it produces an infinite loop:

class Program { static void Main(string[] args) { using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) { var testResult = stream .AsObservable(1024) .ToEnumerable() .ToArray(); Console.WriteLine(testResult.Length); } } } 

Of course I could add an .SubscribeOn(TaskPoolScheduler.Default) but however, the infinite loop stays alive (blocks a task pool scheduler + infinitely reads from Stream).

[UPDATE 2017-05-09]

Shlomo posted a better example to reproduce this issue:

int i = 0; var testResult = Observable.FromAsync(() => Task.FromResult(i++)) .Repeat() .TakeWhile(l => l < 3); testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted")); Console.WriteLine("This is never printed."); 
10
  • You have a .Repeat() call in there, which 'repeats the observable sequence indefinitely`. What is it exactly that you are trying to achieve? Commented May 9, 2017 at 9:05
  • Read the Stream until it ends or the user disposes her/his subscription. TakeWhile should stop the observable stream if the predicate evaluates with true (end of stream). At the moment even .Dispose() at the subscription does not stop .Repeat(). Commented May 9, 2017 at 9:14
  • Why would .TakeWhile() stop if you call it after the .Repeat() ? Commented May 9, 2017 at 9:18
  • I do have a solution (using Observable.Create). I would like to understand why the combination of Observable.FromAsync + .Repeat() does not work. Commented May 9, 2017 at 9:19
  • 1
    stackoverflow.com/questions/8284300/… Commented May 11, 2017 at 2:55

3 Answers 3

4

For anyone who ends up here and needs an answer, not just an explanation: the issue appears to be the default scheduler of FromAsync, as indicated by this self-answered question. If you adjust to the "current thread" scheduler Repeat().TakeWhile(...) behaves more predictably. E.g. (extract from question):

.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel), System.Reactive.Concurrency.Scheduler.CurrentThread) .Repeat() .TakeWhile(bytes => bytes != null) // EndOfStream 
Sign up to request clarification or add additional context in comments.

Comments

2

You can confirm the OnCompleted is being produced correctly with this:

using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) { var testResult = stream .AsObservable(1024) ; testResult.Subscribe(b => Console.WriteLine(b), e => {}, () => Console.WriteLine("OnCompleted")); } 

It looks like there's a problem with the .FromAsync + .Repeat combination. The following code acts similarly:

int i = 0; var testResult = Observable.FromAsync(() => Task.FromResult(i++)) .Repeat() .TakeWhile(l => l < 3) ; testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted")); Console.WriteLine("This is never printed."); 

...whereas this code terminates correctly:

var testResult = Observable.Generate(0, i => true, i => i + 1, i => i) .Repeat() .TakeWhile(l => l < 3) ; testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted")); Console.WriteLine("This is printed."); 

1 Comment

Thanks for testing.
0

Shane Neuville posted links that contain an explanation for this behaviour:

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.