Skip to main content
AI Assist is now on Stack Overflow. Start a chat to get instant answers from across the network. Sign up to save and share your chats.
edited tags
Link
Stephen Cleary
  • 460.4k
  • 78
  • 721
  • 858
added 37 characters in body
Source Link
spender
  • 121k
  • 36
  • 245
  • 369

Awaitable task awaitable Task based queue

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

Here's my implementation:

public class MessageQueue<T> { ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task<T> Dequeue() { TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource<T> tcs=null; T firstItem=default(T); while (true) { bool ok; lock (queueSyncLock) { ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); if (ok) { waitingQueue.TryDequeue(out tcs); queue.TryDequeue(out firstItem); } } if (!ok) break; tcs.SetResult(firstItem); } } } 

Awaitable task based queue

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

Here's my implementation:

public class MessageQueue<T> { ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task<T> Dequeue() { TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted?tcs.Task.Result:await tcs.Task; } private void ProcessQueues() { TaskCompletionSource<T> tcs=null; T firstItem=default(T); while (true) { bool ok; lock (queueSyncLock) { ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); if (ok) { waitingQueue.TryDequeue(out tcs); queue.TryDequeue(out firstItem); } } if (!ok) break; tcs.SetResult(firstItem); } } } 

awaitable Task based queue

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

Here's my implementation:

public class MessageQueue<T> { ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task<T> Dequeue() { TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource<T> tcs=null; T firstItem=default(T); while (true) { bool ok; lock (queueSyncLock) { ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); if (ok) { waitingQueue.TryDequeue(out tcs); queue.TryDequeue(out firstItem); } } if (!ok) break; tcs.SetResult(firstItem); } } } 

awaitable Task Awaitable task based queue

added 37 characters in body
Source Link
spender
  • 121k
  • 36
  • 245
  • 369
Loading
Source Link
spender
  • 121k
  • 36
  • 245
  • 369
Loading