Can someone explain why the following AsObservable method creates an infinite loop even though the end of stream is reached?
public static class StreamExt { public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) { return Observable .FromAsync(cancel => stream.ReadBytes(bufferSize, cancel)) .Repeat() .TakeWhile(bytes => bytes != null) // EndOfStream .SelectMany(bytes => bytes); } private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) { var buf = new byte[bufferSize]; var bytesRead = await stream .ReadAsync(buf, 0, bufferSize, cancel) .ConfigureAwait(false); if (bytesRead < 1) return null; // EndOfStream var result_size = Math.Min(bytesRead, bufferSize); Array.Resize(ref buf, result_size); return buf; } } A quick tests shows that it produces an infinite loop:
class Program { static void Main(string[] args) { using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) { var testResult = stream .AsObservable(1024) .ToEnumerable() .ToArray(); Console.WriteLine(testResult.Length); } } } Of course I could add an .SubscribeOn(TaskPoolScheduler.Default) but however, the infinite loop stays alive (blocks a task pool scheduler + infinitely reads from Stream).
[UPDATE 2017-05-09]
Shlomo posted a better example to reproduce this issue:
int i = 0; var testResult = Observable.FromAsync(() => Task.FromResult(i++)) .Repeat() .TakeWhile(l => l < 3); testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted")); Console.WriteLine("This is never printed.");
.Repeat()call in there, which 'repeats the observable sequence indefinitely`. What is it exactly that you are trying to achieve?Streamuntil it ends or the user disposes her/his subscription.TakeWhileshould stop the observable stream if the predicate evaluates withtrue(end of stream). At the moment even.Dispose()at the subscription does not stop.Repeat()..TakeWhile()stop if you call it after the.Repeat()?Observable.Create). I would like to understand why the combination ofObservable.FromAsync+.Repeat()does not work.