5

My problem is that I don't know how synchronise multiple threads using Ruby. The task is to create six threads and start them immediately. All of them should do some work (for example puts "Thread 1" Hi") one after another in the order I need it to work.

I've tried to work with Mutex, Monitor and Condition Variable, but all of them worked in random order. Could anybody explain how to achieve my goal?

After some time of struggling with Mutex and Condition Variable I've achieved my goal. This code is a little bit messy, and I intentionally did't use cycles for "clearer view".

cv = ConditionVariable.new mutex = Mutex.new mutex2 = Mutex.new cv2 = ConditionVariable.new mutex3 = Mutex.new cv3 = ConditionVariable.new mutex4 = Mutex.new cv4 = ConditionVariable.new mutex5 = Mutex.new cv5 = ConditionVariable.new mutex6 = Mutex.new cv6 = ConditionVariable.new Thread.new do mutex.synchronize { puts 'First: Hi' cv.wait(mutex) puts 'First: Bye' #cv.wait(mutex) cv.signal puts 'First: One more time' } end Thread.new do mutex.synchronize { puts 'Second: Hi' cv.signal cv.wait(mutex) puts 'Second:Bye' cv.signal } mutex2.synchronize { puts 'Second: Starting third' cv2.signal } end Thread.new do mutex2.synchronize { cv2.wait(mutex2) puts 'Third: Hi' } mutex3.synchronize { puts 'Third: Starting forth' cv3.signal } end Thread.new do mutex3.synchronize { cv3.wait(mutex3) puts 'Forth: Hi' } mutex4.synchronize { puts 'Forth: Starting fifth' cv4.signal } end Thread.new do mutex4.synchronize { cv4.wait(mutex4) puts 'Fifth: Hi' } mutex5.synchronize { puts 'Fifth: Starting sixth' cv5.signal } end Thread.new { mutex5.synchronize { cv5.wait(mutex5) puts 'Sixth:Hi' } } sleep 2 
9
  • 3
    If you want them to work serially, why do you use threads? Commented May 20, 2014 at 18:06
  • 1
    Please, learn how to use threads properly before you even try to make them work bizarrely. Commented May 20, 2014 at 18:11
  • 2
    NO! Synchronization is not about order, it is about critical sections where you don't want more than one thread touching the same resource at same time, so it won't create inconsistencies Commented May 20, 2014 at 18:18
  • 2
    No. The purpose of synchronization is to permit concurrent operation. Ordering should be handled other ways. (For example, don't arrange to have work done at all until that work needs to be done.) Commented May 20, 2014 at 18:18
  • 1
    Most of the commentators have too narrow a definition of what synchronization is. While synchronization is often about critical sections, it isn't always. Sometimes it is about making sure that B happens after A. Commented May 20, 2014 at 20:53

3 Answers 3

6

Using Queue as a PV Semaphore

You can abuse Queue, using it like a traditional PV Semaphore. To do this, you create an instance of Queue:

require 'thread' ... sem = Queue.new 

When a thread needs to wait, it calls Queue#deq:

# waiting thread sem.deq 

When some other thread wants to unblock the waiting thread, it pushes something (anything) onto the queue:

# another thread that wants to unblock the waiting thread sem.enq :go 

A Worker class

Here's a worker class that uses Queue to synchronize its start and stop:

class Worker def initialize(worker_number) @start = Queue.new Thread.new do @start.deq puts "Thread #{worker_number}" @when_done.call end end def start @start.enq :start end def when_done(&block) @when_done = block end end 

When constructed, a worker creates a thread, but that thread then waits on the @start queue. Not until #start is called will the thread unblock.

When done, the thread will execute the block that was called to #when_done. We'll see how this is used in just a moment.

Creating workers

First, let's make sure that if any threads raise an exception, we get to find out about it:

Thread.abort_on_exception = true 

We'll need six workers:

workers = (1..6).map { |i| Worker.new(i) } 

Telling each worker what to do when it's done

Here's where #when_done comes into play:

workers.each_cons(2) do |w1, w2| w1.when_done { w2.start } end 

This takes each pair of workers in turn. Each worker except the last is told, that when it finishes, it should start the worker after it. That just leaves the last worker. When it finishes, we want it to notify this thread:

all_done = Queue.new workers.last.when_done { all_done.enq :done } 

Let's Go!

Now all that remains is to start the first thread:

workers.first.start 

and wait for the last thread to finish:

all_done.deq 

The output:

Thread 1 Thread 2 Thread 3 Thread 4 Thread 5 Thread 6 
Sign up to request clarification or add additional context in comments.

6 Comments

thank you, very nice solution! But do you know how to achieve the same result using monitor or semaphore?
@RomaBugaian Ruby does not, as far as I know, have a semaphore. That's why I used Queue, which can behave like a semaphore. I don't know how to do this using a monitor--monitors don't seem well suited for serialization.
Ruby has class Mutex that implements semaphore, so it should behave like classic semaphore
@Roma, Good point. Mutex is a little like a classic PV semaphore, but not a whole lot. Mutex is only locked or unlocked; this is equivalent to a PV semaphore which count can be only 0 or 1. Also, a Mutex starts out unlocked (if a PV semaphore, it would have a count of 1).
@Roma I was just trying to rewrite this example using Mutex, and it's worse than I thought: A locked Mutex can only be unlocked by the same thread that locked it. You can't use a Mutex as a PV semaphore.
|
1

If you're just getting started with threads, you might want to try something simple. Let the 1st thread sleep for 1 second, the 2nd for 2 seconds, the 3rd for 3 seconds and so on:

$stdout.sync = true threads = [] (1..6).each do |i| threads << Thread.new { sleep i puts "Hi from thread #{i}" } end threads.each(&:join) 

Output (takes 6 seconds because the threads run in parallel):

Hi from thread 1 Hi from thread 2 Hi from thread 3 Hi from thread 4 Hi from thread 5 Hi from thread 6 

Comments

0

You can assign each a number, which will denote its place in the queue, and check it to see whose turn it is:

class QueuedWorker def initialize(mutex, condition_variable, my_turn) @mutex = mutex @my_turn = my_turn @condition_variable = condition_variable end def self.turn @turn ||= 0 end def self.done @turn = turn + 1 end def run loop do @mutex.synchronize do if QueuedWorker.turn == @my_turn # do actual work QueuedWorker.done @condition_variable.signal return end @condition_variable.signal @condition_variable.wait(@mutex) end end end end mutex = Mutex.new cv = ConditionVariable.new (0..10).each do |i| Thread.new do QueueWorker.new(mutex, cv, i).run end end 

That being said, the implementation is awkward, since threading are specifically not built for serial work. If you need something to work serially, do it in a single thread.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.