I'm looking for an object like a ConcurrentQueue which will allow me to await the Dequeue operation if the queue is empty so I can do something like the following:
public static async Task ServiceLoop() { var awaitedQueue = new AwaitedQueue<int>(); while (!cancelled) { var item = await awaitableQueue.Dequeue(); Console.WriteLine(item); } } I've written the following class, but if an item is added to the queue between the time Dequeue is called and a new awaiter is Enqueued, terrible things will happen.
public class AwaitedQueue<T> : IDisposable { ConcurrentQueue<TaskCompletionSource<T>> awaiters = new ConcurrentQueue<TaskCompletionSource<T>>(); ConcurrentQueue<T> items = new ConcurrentQueue<T>(); public AwaitedQueue() { } public void Enqueue(T item) { if (!awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) { this.items.Enqueue(item); } else { awaiter.SetResult(item); } } public async Task<T> Dequeue() { if (items.TryDequeue(out T item)) { return item; } else { // If an item is enqueued between this call to create a new TaskCompletionSource. var awaiter = new TaskCompletionSource<T>(); // And this call to actually enqueue, I believe it will cause me problems. awaiters.Enqueue(awaiter); return await awaiter.Task; } } public void Dispose() { while (awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) { awaiter.SetCanceled(); awaiter.Task.Wait(); } } } I'm sure a robust and well-tested implementation of this concept already exists, but I don't know which combination of English words I need to type into Google to find it.