115

I would like to await on the result of BlockingCollection<T>.Take() asynchronously, so I do not block the thread. Looking for anything like this:

var item = await blockingCollection.TakeAsync(); 

I know I could do this:

var item = await Task.Run(() => blockingCollection.Take()); 

but that kinda kills the whole idea, because another thread (of ThreadPool) gets blocked instead.

Is there any alternative?

3
  • 3
    I don't get this, if you use await Task.Run(() => blockingCollection.Take()) the task will be perform on other thread and your UI thread won't blocked.Isn't that the point? Commented Jan 20, 2014 at 2:55
  • 11
    @Selman22, this is not a UI app. It is a library exporting Task-based API. It can be used from ASP.NET, for example. The code in question would not scale well there. Commented Jan 20, 2014 at 3:09
  • Would it still be a problem if ConfigureAwait was used after the Run()? [ed. never mind, I see what you're saying now] Commented Jun 15, 2016 at 12:28

4 Answers 4

137

There are four alternatives that I know of.

The first is Channels, which provides a threadsafe queue that supports asynchronous Read and Write operations. Channels are highly optimized and optionally support dropping some items if a threshold is reached.

The next is BufferBlock<T> from TPL Dataflow. If you only have a single consumer, you can use OutputAvailableAsync or ReceiveAsync, or just link it to an ActionBlock<T>. For more information, see my blog.

The last two are types that I created, available in my AsyncEx library.

AsyncCollection<T> is the async near-equivalent of BlockingCollection<T>, capable of wrapping a concurrent producer/consumer collection such as ConcurrentQueue<T> or ConcurrentBag<T>. You can use TakeAsync to asynchronously consume items from the collection. For more information, see my blog.

AsyncProducerConsumerQueue<T> is a more portable async-compatible producer/consumer queue. You can use DequeueAsync to asynchronously consume items from the queue. For more information, see my blog.

The last three of these alternatives allow synchronous and asynchronous puts and takes.

Sign up to request clarification or add additional context in comments.

5 Comments

Git Hub link for when CodePlex finally shuts down: github.com/StephenCleary/AsyncEx
The API documentation contains the method AsyncCollection.TryTakeAsync, but I can't find it in the downloaded Nito.AsyncEx.Coordination.dll 5.0.0.0 (latest version). The referenced Nito.AsyncEx.Concurrent.dll does not exist in the package. What am I missing?
@TheodorZoulias: That method was removed in v5. The v5 API docs are here.
Oh, thanks. It looks like it was the easiest and safest way to enumerate the collection. while ((result = await collection.TryTakeAsync()).Success) { }. Why it was removed?
@TheodorZoulias: Because "Try" means different things to different people. I'm thinking of adding a "Try" method back in but it would actually have different semantics than the original method. Also looking at supporting async streams in a future version, which would definitely be the best method of consumption when supported.
27

...or you can do this:

using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class AsyncQueue<T> { private readonly SemaphoreSlim _sem; private readonly ConcurrentQueue<T> _que; public AsyncQueue() { _sem = new SemaphoreSlim(0); _que = new ConcurrentQueue<T>(); } public void Enqueue(T item) { _que.Enqueue(item); _sem.Release(); } public void EnqueueRange(IEnumerable<T> source) { var n = 0; foreach (var item in source) { _que.Enqueue(item); n++; } _sem.Release(n); } public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken)) { for (; ; ) { await _sem.WaitAsync(cancellationToken); T item; if (_que.TryDequeue(out item)) { return item; } } } } 

Simple, fully functional asynchronous FIFO queue.

Note: SemaphoreSlim.WaitAsync was added in .NET 4.5 before that, this was not all that straightforward.

9 Comments

What's the use of infinite for? if semaphore is released, queue has at least one item to dequeue, no?
@Blendester there might be a race condition if multiple consumers are blocked. We cannot know for sure that there isn't at least two competing consumers and we don't know if both of them manage to wake up before they get to deque an item. In the event of a race, if one doesn't managed to deque, it will go back to sleep and wait for another signal.
If two or more consumers make it past WaitAsync(), then there are an equivalent number of items in the queue, and thus they will always dequeue successfully. Am I missing something?
This is a blocking collection, the semantics of TryDequeue are, return with a value, or do not return at all. Technically, if you have more than 1 reader, the same reader can consume two (or more) items before any other reader is fully awake. A successful WaitAsync is just a signal that there may be items in the queue to consume, it's not a guarantee.
@AshishNegi it's just a consequence of the TryDequeue API design. This way, if for whatever reason the queue doesn't doesn't successfully deque, it's behavior is still defined. We could just as well throw an exception. I may have adapted this example for a code that used a monitor were we actually could wake up multiples consumers. I think it's a remnant of that. I can't see why it would be absolutely necessary.
|
12

The asynchronous (non-blocking) alternative of the BlockingCollection<T> is the Channel<T> class. It offers almost the same functionality, plus some extra features. You can instantiate a Channel<T> using the Channel's static factory methods, as shown below (demonstrating the default values of all available options).

Channel<Item> channel = Channel.CreateUnbounded<Item>(new UnboundedChannelOptions() { SingleWriter = false, SingleReader = false, AllowSynchronousContinuations = false, }); 
Channel<Item> channel = Channel.CreateBounded<Item>(new BoundedChannelOptions(capacity) { SingleWriter = false, SingleReader = false, AllowSynchronousContinuations = false, FullMode = BoundedChannelFullMode.Wait, }); 

The most striking difference is that the Channel<T> exposes a Writer and a Reader facade. So you can pass the Writer facade to a method that plays the role of the producer, and similarly the Reader facade to a method that plays the role of the consumer. The Writer is only allowed to add items in the channel, and mark it as completed. The Reader is only allowed to take items from the channel, and await its completion. Both facades expose only non-blocking APIs. For example the ChannelWriter<T> has a WriteAsync method that returns a ValueTask. If you have some reason to block on these APIs, for example if one worker of your producer/consumer pair has to be synchronous, then you can block with .AsTask().GetAwaiter().GetResult(), but this will not be as efficient as using a BlockingCollection<T>. If you want to learn more about the similarities and differences between the Channel<T> and BlockingCollection<T> classes, take a look at this answer.

An implementation of a custom AsyncBlockingCollection<T> class, having only the most basic features, can be found in the 3rd revision of this answer.

2 Comments

As an afterthought, I now think that the class name AsyncBlockingCollection is nonsensical. Something cannot be asynchronous and blocking at the same time, since these two concepts are the exact opposites!
But still, it IS an async version of the BlockingCollection :)
-1

This is super-simple, but it serves my needs.

 public static class BlockingCollectionEx { public async static Task<T> TakeAsync<T>(this BlockingCollection<T> bc, CancellationToken token, int inner_delay = 10) { while (!token.IsCancellationRequested) { if (bc.TryTake(out T el)) return el; else await Task.Delay(inner_delay); } throw new OperationCanceledException(); } } 

5 Comments

Instead of while (!token.IsCancellationRequested) and throw new OperationCanceledException();, it is simpler and better to just pass the token to the TryTake method, like in Dejisys's answer: TryTake(out T el, 0, token)
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.
Adding to @TheodorZoulias's comment, I think it can be simplified down to this: while (!bc.TryTake(out T el, inner_delay, token)) ; return el;
@Aaron the intention of the TakeAsync is to be non-blocking. The TryTake overload with millisecondsTimeout parameter is blocking, so it defeats the purpose.
@TheodorZoulias Haha! You're right. :-) I guess it's this: while (!bc.TryTake(out T el, 0, token) { await Task.Delay(inner_delay, token); } return el; (...and yes, I only noticed your response after a year. )

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.