I want to use an async producer/consumer queue (AsyncEx lib) to send messages one at a time over a bus. Right now I achieve this simply by async blocking. It's working fine, but I have no control over the queue :(
So I came up with following solution, problem is that a canceled task is not removed from the queue. If I limit the queue to say 10 (because each message takes 1s to send and max queue time shall be 10s or so) and the queue contains already 8 waiting tasks and 2 canceled tasks, than the next queued task would throw an InvalidOperationException although the two canceled task wouldn't be sent anyway.
Maybe there is a better way to do this :D
class Program { static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue = new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>(); static void Main() { StartAsync().Wait(); } static async Task StartAsync() { var sendingTask = StartSendingAsync(); var tasks = new List<Task>(); using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8))) { for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); } try { await Task.WhenAll(tasks); Console.WriteLine("All messages sent."); } catch (TaskCanceledException) { Console.WriteLine("At least one task was canceled."); } } s_Queue.CompleteAdding(); await sendingTask; s_Queue.Dispose(); Console.WriteLine("Queue completed."); Console.WriteLine("Press any key to continue..."); Console.ReadKey(); } static async Task EnqueueMessageAsync(string message, CancellationToken token) { var tcs = new TaskCompletionSource(); using (token.Register(() => tcs.TrySetCanceled())) { await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs)); Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message); await tcs.Task; } } static async Task SendMessageAsync(string message) { await Task.Delay(TimeSpan.FromSeconds(1)); Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message); } static async Task StartSendingAsync() { while (await s_Queue.OutputAvailableAsync()) { var t = await s_Queue.DequeueAsync(); if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue; await SendMessageAsync(t.Item1); t.Item2.TrySetResult(); } } } Edit 1:
As svik pointed out the InvalidOperationException is only thrown if the queue is already completed. So this solution doesn't even solve my initial problem of an unmanaged "queue" of waiting tasks. If there are e.g. more than 10 calls/10s I got a full queue and an additional unmanaged "queue" of waiting tasks like with my async blocking approach (AsyncMonitor). I guess I have to come up with some other solution then...
Edit 2:
I have N different producers of messages (I don't know how many there are because it's not my code) and only one consumer that sends the messages over a bus and checks if they were sent correctly (not really string messages).
The following code simulates a situation where the code should break (queue size is 10):
- Enqueue 10 messages (with an timeout of 5sec)
- Wait 5sec (message 0-4 were sent and message 5-9 were cancelled)
- Enqueue 11 new messages (w/o timeout)
- Message 10 - 19 should be enqueued because the queue only contains cancelled messages
- Message 20 should throw an exception (e.g. QueueOverflowException) because the queue is full, this would be handled or not by the producer code
Producers:
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5))) { for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); } await Task.Delay(TimeSpan.FromSeconds(5)); for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); } try { await Task.WhenAll(tasks); Console.WriteLine("All messages sent."); } catch (TaskCanceledException) { Console.WriteLine("At least one task was canceled."); Console.WriteLine("Press any key to complete queue..."); Console.ReadKey(); } } The goal is, I want to have full control over all messages that should be send, but this is not the case in the code I've posted before, because I only have control over the messages in the queue but not the messages that are waiting to be enqueued (there could be 10000 messages asynchronously waiting to be enqueued and I wouldn't know => producer code wouldn't work as expected anyway because it would take forever to send all the messages that are waiting...)
I hope this makes it clearer what I want to achieve ;)