2

I'm looking for an object like a ConcurrentQueue which will allow me to await the Dequeue operation if the queue is empty so I can do something like the following:

public static async Task ServiceLoop() { var awaitedQueue = new AwaitedQueue<int>(); while (!cancelled) { var item = await awaitableQueue.Dequeue(); Console.WriteLine(item); } } 

I've written the following class, but if an item is added to the queue between the time Dequeue is called and a new awaiter is Enqueued, terrible things will happen.

public class AwaitedQueue<T> : IDisposable { ConcurrentQueue<TaskCompletionSource<T>> awaiters = new ConcurrentQueue<TaskCompletionSource<T>>(); ConcurrentQueue<T> items = new ConcurrentQueue<T>(); public AwaitedQueue() { } public void Enqueue(T item) { if (!awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) { this.items.Enqueue(item); } else { awaiter.SetResult(item); } } public async Task<T> Dequeue() { if (items.TryDequeue(out T item)) { return item; } else { // If an item is enqueued between this call to create a new TaskCompletionSource. var awaiter = new TaskCompletionSource<T>(); // And this call to actually enqueue, I believe it will cause me problems. awaiters.Enqueue(awaiter); return await awaiter.Task; } } public void Dispose() { while (awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) { awaiter.SetCanceled(); awaiter.Task.Wait(); } } } 

I'm sure a robust and well-tested implementation of this concept already exists, but I don't know which combination of English words I need to type into Google to find it.

1
  • What is the context of it? You could use RabbitMQ if it fits the context. Commented May 15, 2019 at 6:36

1 Answer 1

9

There is a modern solution for this: Channels. A "Channel" is an asynchronous producer/consumer queue.

Channels also have the concept of "completion", so you can complete the channel rather than having a cancelled flag.

Usage:

public static async Task ServiceLoop() { var awaitedQueue = Channel.CreateUnbounded<int>(); var queueReader = awaitedQueue.Reader; while (await queueReader.WaitToReadAsync()) { while (queueReader.TryRead(out var item)) { Console.WriteLine(item); } } } 
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you for the simple explanation and implementation example for Channels. The async read/write to a concurrent collection being consumed by a Task is the perfect application for this concept. While(true) loops wastes huge CPU resources in low bandwidth messaging buffers. This approach uses zero CPU unless it is processing a message.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.