1

I want to use an async producer/consumer queue (AsyncEx lib) to send messages one at a time over a bus. Right now I achieve this simply by async blocking. It's working fine, but I have no control over the queue :(

So I came up with following solution, problem is that a canceled task is not removed from the queue. If I limit the queue to say 10 (because each message takes 1s to send and max queue time shall be 10s or so) and the queue contains already 8 waiting tasks and 2 canceled tasks, than the next queued task would throw an InvalidOperationException although the two canceled task wouldn't be sent anyway.

Maybe there is a better way to do this :D

 class Program { static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue = new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>(); static void Main() { StartAsync().Wait(); } static async Task StartAsync() { var sendingTask = StartSendingAsync(); var tasks = new List<Task>(); using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8))) { for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); } try { await Task.WhenAll(tasks); Console.WriteLine("All messages sent."); } catch (TaskCanceledException) { Console.WriteLine("At least one task was canceled."); } } s_Queue.CompleteAdding(); await sendingTask; s_Queue.Dispose(); Console.WriteLine("Queue completed."); Console.WriteLine("Press any key to continue..."); Console.ReadKey(); } static async Task EnqueueMessageAsync(string message, CancellationToken token) { var tcs = new TaskCompletionSource(); using (token.Register(() => tcs.TrySetCanceled())) { await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs)); Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message); await tcs.Task; } } static async Task SendMessageAsync(string message) { await Task.Delay(TimeSpan.FromSeconds(1)); Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message); } static async Task StartSendingAsync() { while (await s_Queue.OutputAvailableAsync()) { var t = await s_Queue.DequeueAsync(); if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue; await SendMessageAsync(t.Item1); t.Item2.TrySetResult(); } } } 

Edit 1:

As svik pointed out the InvalidOperationException is only thrown if the queue is already completed. So this solution doesn't even solve my initial problem of an unmanaged "queue" of waiting tasks. If there are e.g. more than 10 calls/10s I got a full queue and an additional unmanaged "queue" of waiting tasks like with my async blocking approach (AsyncMonitor). I guess I have to come up with some other solution then...

Edit 2:

I have N different producers of messages (I don't know how many there are because it's not my code) and only one consumer that sends the messages over a bus and checks if they were sent correctly (not really string messages).

The following code simulates a situation where the code should break (queue size is 10):

  1. Enqueue 10 messages (with an timeout of 5sec)
  2. Wait 5sec (message 0-4 were sent and message 5-9 were cancelled)
  3. Enqueue 11 new messages (w/o timeout)
  4. Message 10 - 19 should be enqueued because the queue only contains cancelled messages
  5. Message 20 should throw an exception (e.g. QueueOverflowException) because the queue is full, this would be handled or not by the producer code

Producers:

using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5))) { for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); } await Task.Delay(TimeSpan.FromSeconds(5)); for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); } try { await Task.WhenAll(tasks); Console.WriteLine("All messages sent."); } catch (TaskCanceledException) { Console.WriteLine("At least one task was canceled."); Console.WriteLine("Press any key to complete queue..."); Console.ReadKey(); } } 

The goal is, I want to have full control over all messages that should be send, but this is not the case in the code I've posted before, because I only have control over the messages in the queue but not the messages that are waiting to be enqueued (there could be 10000 messages asynchronously waiting to be enqueued and I wouldn't know => producer code wouldn't work as expected anyway because it would take forever to send all the messages that are waiting...)

I hope this makes it clearer what I want to achieve ;)

5
  • 1
    I don't understand. If the queue is full (but not completed), enqueuing another item shouldn't throw InvalidOperationException. Commented May 1, 2015 at 14:58
  • Yep you are right, no exception EnqueueAsync() is "waiting" until queue isn't full anymore.... Commented May 1, 2015 at 18:40
  • Thats a bummer because I really wanted an exception (well if too many calls are done, cancelled calls shouldn't count :D). So this approach doesn't even solve my problem. If there are more than e.g. 10 calls every 10 seconds. I still got an uncontrolled queue of waiting tasks, like with my current async blocking approach... Commented May 1, 2015 at 18:51
  • Could you maybe step back and explain what is the underlying problem that you're trying to solve? In the code you showed, you're canceling Tasks after 8 seconds, is that representative of your real code? And why do you need to throw when there's more than 10 calls per 10 seconds? Commented May 1, 2015 at 23:47
  • I hope Edit 2 makes it clearer what I want to achieve with this code Commented May 2, 2015 at 8:21

1 Answer 1

0

I'm not sure if answering my own questing is OK, so I won't flag it as answer, maybe someone comes up with a better solution :P

First of all here is the producer code:

static async Task StartAsync() { using (var queue = new SendMessageQueue(10, new SendMessageService())) using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(4.5))) { var tasks = new List<Task>(); for (var i = 0; i < 10; i++) { tasks.Add(queue.SendAsync(i.ToString(), timeoutTokenSource.Token)); } await Task.Delay(TimeSpan.FromSeconds(4.5)); for (var i = 10; i < 25; i++) { tasks.Add(queue.SendAsync(i.ToString(), default(CancellationToken))); } await queue.CompleteSendingAsync(); for (var i = 0; i < tasks.Count; i++ ) { try { await tasks[i]; Console.WriteLine("Message '{0}' send.", i); } catch (TaskCanceledException) { Console.WriteLine("Message '{0}' canceled.", i); } catch (QueueOverflowException ex) { Console.WriteLine(ex.Message); } } } Console.WriteLine("Press any key to continue..."); Console.ReadKey(); } 
  • 25 messages are enqueued over 5sec
  • 16 messages are sent
  • 3 messages are not sent (queue is full)
  • 6 messages get canceled

And here is the "Queue" class that is based upon a List. It's a combination of the queue and the consumer. Synchronisation is done with the AsyncMonitor class (AsyncEx by Stephen Cleary).

class SendMessageQueue : IDisposable { private bool m_Disposed; private bool m_CompleteSending; private Task m_SendingTask; private AsyncMonitor m_Monitor; private List<MessageTaskCompletionSource> m_MessageCollection; private ISendMessageService m_SendMessageService; public int Capacity { get; private set; } public SendMessageQueue(int capacity, ISendMessageService service) { Capacity = capacity; m_Monitor = new AsyncMonitor(); m_MessageCollection = new List<MessageTaskCompletionSource>(); m_SendMessageService = service; m_SendingTask = StartSendingAsync(); } public async Task<bool> SendAsync(string message, CancellationToken token) { if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); } if (message == null) { throw new ArgumentNullException("message"); } using (var messageTcs = new MessageTaskCompletionSource(message, token)) { await AddAsync(messageTcs); return await messageTcs.Task; } } public async Task CompleteSendingAsync() { if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); } using (m_Monitor.Enter()) { m_CompleteSending = true; } await m_SendingTask; } private async Task AddAsync(MessageTaskCompletionSource message) { using (await m_Monitor.EnterAsync(message.Token)) { if (m_CompleteSending) { throw new InvalidOperationException("Queue already completed."); } if (Capacity < m_MessageCollection.Count) { m_MessageCollection.RemoveAll(item => item.IsCanceled); if (Capacity < m_MessageCollection.Count) { throw new QueueOverflowException(string.Format("Queue overflow; '{0}' couldn't be enqueued.", message.Message)); } } m_MessageCollection.Add(message); } m_Monitor.Pulse(); // signal new message Console.WriteLine("Thread '{0}' - {1}: '{2}' enqueued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message.Message); } private async Task<MessageTaskCompletionSource> TakeAsync() { using (await m_Monitor.EnterAsync()) { var message = m_MessageCollection.ElementAt(0); m_MessageCollection.RemoveAt(0); return message; } } private async Task<bool> OutputAvailableAsync() { using (await m_Monitor.EnterAsync()) { if (m_MessageCollection.Count > 0) { return true; } else if (m_CompleteSending) { return false; } await m_Monitor.WaitAsync(); return true; } } private async Task StartSendingAsync() { while (await OutputAvailableAsync()) { var message = await TakeAsync(); if (message.IsCanceled) continue; try { var result = await m_SendMessageService.SendMessageAsync(message.Message, message.Token); message.TrySetResult(result); } catch (TaskCanceledException) { message.TrySetCanceled(); } } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if (m_Disposed) return; if (disposing) { if (m_MessageCollection != null) { var tmp = m_MessageCollection; m_MessageCollection = null; tmp.ForEach(item => item.Dispose()); tmp.Clear(); } } m_Disposed = true; } #region MessageTaskCompletionSource Class class MessageTaskCompletionSource : TaskCompletionSource<bool>, IDisposable { private bool m_Disposed; private IDisposable m_CancellationTokenRegistration; public string Message { get; private set; } public CancellationToken Token { get; private set; } public bool IsCompleted { get { return Task.IsCompleted; } } public bool IsCanceled { get { return Task.IsCanceled; } } public bool IsFaulted { get { return Task.IsFaulted; } } public MessageTaskCompletionSource(string message, CancellationToken token) { m_CancellationTokenRegistration = token.Register(() => TrySetCanceled()); Message = message; Token = token; } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if (m_Disposed) return; if (disposing) { TrySetException(new ObjectDisposedException(GetType().Name)); if (m_CancellationTokenRegistration != null) { var tmp = m_CancellationTokenRegistration; m_CancellationTokenRegistration = null; tmp.Dispose(); } } m_Disposed = true; } } #endregion } 

For now I'am OK with this solution; it's gets the job done :D

Sign up to request clarification or add additional context in comments.

1 Comment

Please mark your answer as accepted. If you don't it will get auto-promoted to the front page every few moths as a question with no accepted answer.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.