2

I'm an old dog trying to learn a new trick. I'm extremely familiar with a language called PowerBuilder and in that language, when you want to do things asynchronously, you spawn an object in a new thread. I'll reiterate that: the entire object is instantiated in a separate thread and has a different execution context. Any and all methods on that object execute in the context of that separate thread.

Well now, I'm trying to implement some asynchronous executing using C# and the threading model in .NET feels completely different to me. It looks like I'm instantiating objects in one thread but that I can specify (on a call-by-call basis) that certain methods execute in a different thread.

The difference seems subtle, but it's frustrating me. My old-school thinking says, "I have a helper named Bob. Bob goes off and does stuff." The new-school thinking, if I understand it right, is "I am Bob. If I need to, I can sometimes rub my belly and pat my head at the same time."

My real-world coding problem: I'm writing an interface engine that accepts messages via TCP, parses them into usable data, then puts that data into a database. "Parsing" a message takes approximately one second. Depending on the parsed data, the database operation may take less than a second or it might take ten seconds. (All times made up to clarify the problem.)

My old-school thinking tells me that my database class should live in a separate thread and have something like a ConcurrentQueue. It would simply spin on that queue, processing anything that might be in there. The Parser, on the other hand, would need to push messages into that queue. These messages would be (delegates?) things like "Create an order based on the data in this object" or "Update an order based on the data in this object". It might be worth noting that I actually want to process the "messages" in the "queue" in a strict, single-threaded FIFO order.

Basically, my database connection can't always keep up with my parser. I need a way to make sure my parser doesn't slow down while my database processes try to catch up. Advice?

-- edit: with code! Everyone and everything is telling me to use BlockingCollection. So here's a brief explanation of the end goal and code to go with it:

This will be a Windows service. When started, it will spawn multiple "environments", with each "environment" containing one "dbworker" and one "interface". The "interface" will have one "parser" and one "listener".

class cEnvironment { private cDBWorker MyDatabase; private cInterface MyInterface; public void OnStart () { MyDatabase = new cDBWorker (); MyInterface = new cInterface (); MyInterface.OrderReceived += this.InterfaceOrderReceivedEventHandler; MyDatabase.OnStart (); MyInterface.OnStart (); } public void OnStop () { MyInterface.OnStop (); MyDatabase.OnStop (); MyInterface.OrderReceived -= this.InterfaceOrderReceivedEventHandler; } void InterfaceOrderReceivedEventHandler (object sender, OrderReceivedEventArgs e) { MyDatabase.OrderQueue.Add (e.Order); } } class cDBWorker { public BlockingCollection<cOrder> OrderQueue = new BlockingCollection<cOrder> (); private Task ProcessingTask; public void OnStart () { ProcessingTask = Task.Factory.StartNew (() => Process (), TaskCreationOptions.LongRunning); } public void OnStop () { OrderQueue.CompleteAdding (); ProcessingTask.Wait (); } public void Process () { foreach (cOrder Order in OrderQueue.GetConsumingEnumerable ()) { switch (Order.OrderType) { case 1: SuperFastMethod (Order); break; case 2: ReallySlowMethod (Order); break; } } } public void SuperFastMethod (cOrder Order) { } public void ReallySlowMethod (cOrder Order) { } } class cInterface { protected cListener MyListener; protected cParser MyParser; public void OnStart () { MyListener = new cListener (); MyParser = new cParser (); MyListener.DataReceived += this.ListenerDataReceivedHandler; MyListener.OnStart (); } public void OnStop () { MyListener.OnStop (); MyListener.DataReceived -= this.ListenerDataReceivedHandler; } public event OrderReceivedEventHandler OrderReceived; protected virtual void OnOrderReceived (OrderReceivedEventArgs e) { if (OrderReceived != null) OrderReceived (this, e); } void ListenerDataReceivedHandler (object sender, DataReceivedEventArgs e) { foreach (string Message in MyParser.GetMessages (e.RawData)) { OnOrderReceived (new OrderReceivedEventArgs (MyParser.ParseMessage (Message))); } } 

It compiles. (SHIP IT!) But does that mean that I'm doing it right?

17
  • So, have you tried implementing that? It sounds like you at least know where to start. On a side note, a BlockingCollection may be more suitable than an explicit concurrent queue (it will use one internally). Commented Nov 7, 2013 at 22:27
  • Also note that threads will virtually always share the same memory space. Threads, at the lowest level, don't "own" an object, but programmers often (and justifiably) apply their own constraints that a given object be only used in a single thread, which makes it easier to reason about said object. While you can use a given object in multiple threads, it's often not a good idea to do so (with the exception of objects built to be used by multiple threads, like say a BlockingCollection), or data that is "read only". Commented Nov 7, 2013 at 22:30
  • For my understanding the Queue already separates the processing - on the one side you are filling it asynchronously and on the other side you are pulling it asynchronously. It seems safe - the two components don't block each other. Commented Nov 7, 2013 at 22:30
  • @pasty Queue most certainly is not safe to use from multiple threads, however ConcurrentQueue is. Commented Nov 7, 2013 at 22:35
  • Everything looks fine to me except that your queue items should probably be message objects, not delegates. Having objects that own threads is a pattern I use from time to time, just make sure you clean up after yourself in dispose and expose a thread safe interface. Commented Nov 7, 2013 at 22:45

1 Answer 1

3

BlockingCollection makes putting this kind of thing together pretty easy:

// the queue private BlockingCollection<Message> MessagesQueue = new BlockingCollection<Message>(); // the consumer private MessageParser() { foreach (var msg in MessagesQueue.GetConsumingEnumerable()) { var parsedMessage = ParseMessage(msg); // do something with the parsed message } } // In your main program // start the consumer var consumer = Task.Factory.StartNew(() => MessageParser(), TaskCreationOptions.LongRunning); // the main loop while (messageAvailable) { var msg = GetMessageFromTcp(); // add it to the queue MessagesQueue.Add(msg); } // done receiving messages // tell the consumer that no more messages will be added MessagesQueue.CompleteAdding(); // wait for consumer to finish consumer.Wait(); 

The consumer does a non-busy wait on the queue, so it's not eating CPU resources when there's nothing available.

Sign up to request clarification or add additional context in comments.

1 Comment

Additional kudos to Servy, Eric Lippert, and Scott Chamberlain! Original post edited with final code.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.