Skip to main content
AI Assist is now on Stack Overflow. Start a chat to get instant answers from across the network. Sign up to save and share your chats.
edited to implement BufferBlock<> over AsyncQueue<>
Source Link
bmiller
  • 1.8k
  • 1
  • 17
  • 17

Well 8 years later I hit this very question looking for an Async Queue object myself. I discovered thatand was about to implement the MS created an AsyncQueue<T> class found in nuget package/namespace: Microsoft.VisualStudio.Threading

Docs here: https://docs.microsoft.com/en-us/dotnet/api/microsoft.visualstudio.threading.asyncqueue-1?view=visualstudiosdk-2019Thanks to @Theodor Zoulias for mentioning this api may be outdated and the DataFlow lib would be a good alternative.

So I edited my AsyncQueue<> implementation to use BufferBlock<>. Almost the same but works better.

I use this in an AspNet Core background thread like soand it runs fully async.

protected async Task RunMyRun() { AsyncQueue<MyObj>BufferBlock<MyObj> queue = new AsyncQueue<MyObj>BufferBlock<MyObj>(); Task enqueueTask = StartDataIteration(queue); while (!await queue.IsCompletedOutputAvailableAsync()) { var dequeueTaskmyObj = queue.DequeueAsync(); await Task.WhenAnyReceive(dequeueTask, queue.Completion);     if (dequeueTask.IsCompletedSuccessfully)  { // do something with dequeueTask.Result  }myObj } } public async Task StartDataIteration(AsyncQueue<MyObj>BufferBlock<MyObj> queue) { var cursor = await RunQuery(); while(await cursor.Next()) { queue.EnqueuePost(cursor.Current); } queue.Complete(); // <<< signals the consumer when queue.Count reaches 0 } 

NoteI found that if you are awaiting DequeueAsync whenusing the queue is empty and Complete() is called, a TaskCanceledException will be thrown.

This is why I await both the DequeueAsyncOutputAvailableAsync() Task ANDfixed the queue.Completion Task property, thenissue that I check the dequeue taskhad with AsyncQueue<> IsCompletedSuccessfully prop before touching dequeueTask.Result

You may also want-- trying to throttledetermine when the Enqueue loop (by checking queue.Count). Could run you out of memory fast - but this is true for any async queue scenariowas complete and not having to inspect the dequeue task.

Well 8 years later I hit this very question looking for an Async Queue object myself. I discovered that MS created an AsyncQueue<T> class found in nuget package/namespace: Microsoft.VisualStudio.Threading

Docs here: https://docs.microsoft.com/en-us/dotnet/api/microsoft.visualstudio.threading.asyncqueue-1?view=visualstudiosdk-2019

I use this in an AspNet Core background thread like so.

protected async Task Run() { AsyncQueue<MyObj> queue = new AsyncQueue<MyObj>(); Task enqueueTask = StartDataIteration(queue); while (!queue.IsCompleted) { var dequeueTask = queue.DequeueAsync(); await Task.WhenAny(dequeueTask, queue.Completion);     if (dequeueTask.IsCompletedSuccessfully)  { // do something with dequeueTask.Result  } } } public async Task StartDataIteration(AsyncQueue<MyObj> queue) { var cursor = await RunQuery(); while(await cursor.Next()) { queue.Enqueue(cursor.Current); } queue.Complete(); // <<< signals the consumer when queue.Count reaches 0 } 

Note that if you are awaiting DequeueAsync when the queue is empty and Complete() is called, a TaskCanceledException will be thrown.

This is why I await both the DequeueAsync() Task AND the queue.Completion Task property, then I check the dequeue task IsCompletedSuccessfully prop before touching dequeueTask.Result

You may also want to throttle the Enqueue loop (by checking queue.Count). Could run you out of memory fast - but this is true for any async queue scenario.

Well 8 years later I hit this very question and was about to implement the MS AsyncQueue<T> class found in nuget package/namespace: Microsoft.VisualStudio.Threading

Thanks to @Theodor Zoulias for mentioning this api may be outdated and the DataFlow lib would be a good alternative.

So I edited my AsyncQueue<> implementation to use BufferBlock<>. Almost the same but works better.

I use this in an AspNet Core background thread and it runs fully async.

protected async Task MyRun() { BufferBlock<MyObj> queue = new BufferBlock<MyObj>(); Task enqueueTask = StartDataIteration(queue); while (await queue.OutputAvailableAsync()) { var myObj = queue.Receive(); // do something with myObj } } public async Task StartDataIteration(BufferBlock<MyObj> queue) { var cursor = await RunQuery(); while(await cursor.Next()) { queue.Post(cursor.Current); } queue.Complete(); // <<< signals the consumer when queue.Count reaches 0 } 

I found that using the queue.OutputAvailableAsync() fixed the issue that I had with AsyncQueue<> -- trying to determine when the queue was complete and not having to inspect the dequeue task.

Source Link
bmiller
  • 1.8k
  • 1
  • 17
  • 17

Well 8 years later I hit this very question looking for an Async Queue object myself. I discovered that MS created an AsyncQueue<T> class found in nuget package/namespace: Microsoft.VisualStudio.Threading

Docs here: https://docs.microsoft.com/en-us/dotnet/api/microsoft.visualstudio.threading.asyncqueue-1?view=visualstudiosdk-2019

I use this in an AspNet Core background thread like so.

protected async Task Run() { AsyncQueue<MyObj> queue = new AsyncQueue<MyObj>(); Task enqueueTask = StartDataIteration(queue); while (!queue.IsCompleted) { var dequeueTask = queue.DequeueAsync(); await Task.WhenAny(dequeueTask, queue.Completion); if (dequeueTask.IsCompletedSuccessfully) { // do something with dequeueTask.Result } } } public async Task StartDataIteration(AsyncQueue<MyObj> queue) { var cursor = await RunQuery(); while(await cursor.Next()) { queue.Enqueue(cursor.Current); } queue.Complete(); // <<< signals the consumer when queue.Count reaches 0 } 

Note that if you are awaiting DequeueAsync when the queue is empty and Complete() is called, a TaskCanceledException will be thrown.

This is why I await both the DequeueAsync() Task AND the queue.Completion Task property, then I check the dequeue task IsCompletedSuccessfully prop before touching dequeueTask.Result

You may also want to throttle the Enqueue loop (by checking queue.Count). Could run you out of memory fast - but this is true for any async queue scenario.