6

I'm implementing a data link layer using a producer/consumer pattern. The data link layer has its own thread and state machine to communicate the data link protocol over the wire (Ethernet, RS-232...). The interface to the physical layer is represented as a System.IO.Stream. Another thread writes messages to and reads messages from the data link object.

The data link object has an idle state that must wait for one of four conditions:

  1. A byte is received
  2. A message is available from the network thread
  3. The keep-alive timer has expired
  4. All communication was cancelled by the network layer

I'm having a difficult time figuring out the best way to do this without splitting up communication into a read/write thread (thereby significantly increasing the complexity). Here's how I can get 3 out of 4:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token. stream.ReadTimeout = 10000; await stream.ReadAsync(buf, 0, 1, cts.Token); 

or

BlockingCollection<byte[]> SendQueue = new ...; ... // Check for a message from network layer. Timeout after 10 seconds. // Monitor cancellation token. SendQueue.TryTake(out msg, 10000, cts.Token); 

What should I do to block the thread, waiting for all four conditions? All recommendations are welcome. I'm not set on any architecture or data structures.

EDIT: ******** Thanks for the help everyone. Here's my solution ********

First I don't think there was an asynchronous implementation of the producer/consumer queue. So I implemented something similar to this stackoverflow post.

I needed an external and internal cancellation source to stop the consumer thread and cancel the intermediate tasks, respectively, similar to this article.

byte[] buf = new byte[1]; using (CancellationTokenSource internalTokenSource = new CancellationTokenSource()) { CancellationToken internalToken = internalTokenSource.Token; CancellationToken stopToken = stopTokenSource.Token; using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken)) { CancellationToken ct = linkedCts.Token; Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct); Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct); Task keepAliveTask = Task.Delay(m_keepAliveTime, ct); // Wait for at least one task to complete await Task.WhenAny(readTask, msgTask, keepAliveTask); // Next cancel the other tasks internalTokenSource.Cancel(); try { await Task.WhenAll(readTask, msgTask, keepAliveTask); } catch (OperationCanceledException e) { if (e.CancellationToken == stopToken) throw; } if (msgTask.IsCompleted) // Send the network layer message else if (readTask.IsCompleted) // Process the byte from the physical layer else Contract.Assert(keepAliveTask.IsCompleted); // Send a keep alive message } } 
1

3 Answers 3

3

I would go with your option two, waiting for any of the 4 conditions to happen. Assuming you have the 4 tasks as awaitable methods already:

var task1 = WaitForByteReceivedAsync(); var task2 = WaitForMessageAvailableAsync(); var task3 = WaitForKeepAliveTimerAsync(); var task4 = WaitForCommunicationCancelledAsync(); // now gather them IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{ task1, task2, task3, task4 }; // Wait for any of the things to complete var result = await Task.WhenAny(theTasks); 

The code above will resume immediately after the first task completes, and ignore the other 3.

Note:

In the documentation for WhenAny, it says:

The returned task will always end in the RanToCompletion state with its Result set to the first task to complete. This is true even if the first task to complete ended in the Canceled or Faulted state.

So make sure to do that final check before trusting what happened:

if(result.Result.Result == true) ... // First Result is the task, the second is the bool that the task returns 
Sign up to request clarification or add additional context in comments.

8 Comments

I think you meant WhenAny()
No. WhenAny returns the moment any of the tasks returns. WhenAll awaits for every task given to complete, and returns in itself a Task (as opposed to WaitAll()
This doesn't work because the result of the byte read might be discarded. I assume he cares about that data not being lost.
So after a Cancel you still want to wait for a byte or a message to come in? And vice versa?
The original question says " The data link object has an idle state that must wait for four conditions" - in my head, that means "wait for all 4 things to happen". If you mean any of those 4 can happen, then of course, WaitAny() is what you want :)
|
2

In this case, I would only use cancellation tokens for cancellation. A repeated timeout like a keep-alive timer is better represented as a timer.

So, I would model this as three cancelable tasks. First, the cancellation token:

All communication was cancelled by the network layer

CancellationToken token = ...; 

Then, three concurrent operations:

A byte is received

var readByteTask = stream.ReadAsync(buf, 0, 1, token); 

The keep-alive timer has expired

var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 

A message is available from the network thread

This one is a bit trickier. Your current code uses BlockingCollection<T>, which is not async-compatible. I recommend switching to TPL Dataflow's BufferBlock<T> or my own AsyncProducerConsumerQueue<T>, either of which can be used as async-compatible producer/consumer queues (meaning that the producer can be sync or async, and the consumer can be sync or async).

BufferBlock<byte[]> SendQueue = new ...; ... var messageTask = SendQueue.ReceiveAsync(token); 

Then you can use Task.WhenAny to determine which of these tasks completed:

var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask); 

Now, you can retrieve results by comparing completedTask to the others and awaiting them:

if (completedTask == readByteTask) { // Throw an exception if there was a read error or cancellation. await readByteTask; var byte = buf[0]; ... // Continue reading readByteTask = stream.ReadAsync(buf, 0, 1, token); } else if (completedTask == keepAliveTimerTask) { // Throw an exception if there was a cancellation. await keepAliveTimerTask; ... // Restart keepalive timer. keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); } else if (completedTask == messageTask) { // Throw an exception if there was a cancellation (or the SendQueue was marked as completed) byte[] message = await messageTask; ... // Continue reading messageTask = SendQueue.ReceiveAsync(token); } 

2 Comments

Wish I had seen this earlier as I went and implemented my own AsyncQueue, albeit not as general as yours. Why do you await each individual task? Shouldn't it already be completed?
@BrianHeilig: It's necessary to retrieve results. These tasks have completed but their results haven't been observed. Results can be data, e.g., messageTask, but results can also be exceptions. keepAliveTimerTask can have an exception if it's canceled, and readByteTask can have an exception if it's canceled or if there's some I/O read error. await will (re-)raise those exceptions if present.
1

Cancelling a read leaves you no way to know whether data was read or not. Cancelling and reading are not atomic with respect to each other. That approach only works if you close the stream after cancellation.

The queue approach is better. You can create a linked CancellationTokenSource that becomes cancelled whenever you want it. Instead of passing cts.Token you pass a token that you control.

You can then signal that token based on time, another token and any other event that you like. If you use the built-in timeout the queue will internally do the same thing to link the incoming token with a timeout.

6 Comments

Sorry, I was a little short on detail. The cts is a CancellationTokenSource which is owned by the data link layer. Cancellation is also controlled by the data link layer; the network thread calls stop which cancels all communication and waits for the thread to complete. The goal is to gracefully and quickly stop the data link layer.
And why would this answer not work? You can make TryTake abort on any condition you like.
I think you link the token, cancel the linked cts when "message is available from the network thread" and also provide a timeout. That should get you all 4 conditions and never lose data from the queue.
After cancellation you can check which one happened by examining the IsCancellationRequested property of all tokens involved.
Frankly, I don't fully understand the scenario but I'm confident that you can make any cancellation scheme work using linked tokens and TryTake.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.