I am working on a concurrency system based on message queues with an underlying thread pool. A queue can target other queues. Also a queue can be targeted by multiple queues (i.e. children) to create a tree hierarchy and to prioritise some of the queues.
I want to avoid signalling threads each time a message is queued while the queue (and its children) is (are) already being drained.
So the queue knows if it's sleeping or running. Transition from sleeping to running is easy. That can be done when a new message is queued.
I am more concerned about the other way around. How can I avoid missing messages/events when I transition from running to sleeping? To clarify: this is not an atomic step. The queue also can't go to sleep when its children still need attention.
At the moment in the sleep function I set the state to sleeping, then check if the current queue or its children have any pending events that were missed. If so, I wake the queue back up. But doesn't that potentially cause an infinite recursion of awake - sleep - awake - sleep - ... ?
How can I avoid this, or what's the proper way to setup such a queue system?
In pseudo code:
atomic state = sleeping target = anotherQueue children = queues that target this queue func Async(event) { get lock push event onto queue (i.e. array/fifo/...) release lock this.wakeup() } func wakeup() { If (state == running) { return } state = running !target ? drain() : target.wakeup() } func drain() { while (event = next() ) { event.run() } sleep() } func sleep() { state = sleeping *** what if an event is posted during the transition from running to sleeping? *** //check if events have been missed If (event.count > 0) { this.wakeup() } //check children: don't sleep if they have pending events If (children.haveEvents) { this.wakeup } } func next() { get lock nextEvent = pop event from queue release lock If (!nextEvent) { //check each child queue for an event children.nextEvent } return nextEvent }
while(true) {drain();}and thensleepdoesn't need to callwakeup, it can just return without sleeping. To avoid the potential deadlock I'd use a condition variable forstate.