1

I have got a class that records eyetracking data asynchronously. There are methods to start and stop the recording process. The data is collected in a collection and the collection can only be accessed if the recording thread has finished its work. It basically encapsulates all the threading and synchronizing so the user of my library doesn't have to do it.

The heavily shortened code (generics and error handling omitted):

public class Recorder { private Collection accumulatorCollection; private Thread recordingThread; private class RecordingRunnable implements Runnable { ... public void run() { while(!Thread.currentThread().isInterrupted()) { // fetch data and collect it in the accumulator synchronized(acc) { acc.add(Eyetracker.getData()) } } } } public void start() { accumulatorCollection = new Collection(); recordingThread = new Thread(new RecordingRunnable(accumulatorCollection)); recordingThread.start(); } public void stop() { recordingThread.interrupt(); } public void getData() { try { recordingThread.join(2000); if(recordingThread.isAlive()) { throw Exception(); } } catch(InterruptedException e) { ... } synchronized(accumulatorCollection) { return accumulatorCollection; } } } 

The usage is quite simple:

recorder.start(); ... recorder.stop(); Collection data = recorder.getData(); 

My problem with the whole thing is how to test it. Currently i am doing it like this:

recorder.start(); Thread.sleep(50); recorder.stop(); Collection data = recorder.getData(); assert(stuff); 

This works, but it is non-deterministic and slows down the test suite quite a bit (i marked these tests as integration tests, so they have to be run separately to circumvent this problem).

Is there a better way?

3
  • Are you sleeping so you can collect data, or to ensure the process has started? Commented Feb 27, 2017 at 21:01
  • I do it to collect data, otherwise i would only get a single data package or so. Commented Feb 27, 2017 at 21:15
  • 1
    Looks like you are reinventing the Future, that you can get by submitting a task to an ExecutorService. Commented Feb 28, 2017 at 12:09

1 Answer 1

2

There is a better way using a CountDownLatch.

The non-deterministic part of the test stems from two variables in time you do not account for:

  • creating and starting a thread takes time and the thread may not have started executing the runnable when Thread.start() returns (the runnable will get executed, but it may be a bit later).
  • the stop/interrupt will break the while-loop in the Runnable but not immediately, it may be a bit later.

This is where a CountDownLatch comes in: it gives you precise information about where another thread is in execution. E.g. let the first thread wait on the latch, while the second "counts down" the latch as last statement within a runnable and now the first thread knows that the runnable finished. The CountDownLatch also acts as a synchronizer: whatever the second thread was writing to memory, can now be read by the first thread.

Instead of using an interrupt, you can also use a volatile boolean. Any thread reading the volatile variable is guaranteed to see the last value set by any other thread.

A CountDownLatch can also be given a timeout which is useful for tests that can hang: if you have to wait to long you can abort the whole test (e.g. shutdown executors, interrupt threads) and throw an AssertionError. In the code below I re-used the timeout to wait for a certain amount of data to collect instead of 'sleeping'.

As an optimization, use an Executor (ThreadPool) instead of creating and starting threads. The latter is relative expensive, using an Executor can really make a difference.

Below the updated code, I made it runnable as an application (main method). (edit 28/02/17: check maxCollect > 0 in while-loop)

import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; public class Recorder { private final ExecutorService executor; private Thread recordingThread; private volatile boolean stopRecording; private CountDownLatch finishedRecording; private Collection<Object> eyeData; private int maxCollect; private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean stopped = new AtomicBoolean(); public Recorder() { this(null); } public Recorder(ExecutorService executor) { this.executor = executor; } public Recorder maxCollect(int max) { maxCollect = max; return this; } private class RecordingRunnable implements Runnable { @Override public void run() { try { int collected = 0; while (!stopRecording) { eyeData.add(EyeTracker.getData()); if (maxCollect > 0 && ++collected >= maxCollect) { stopRecording = true; } } } finally { finishedRecording.countDown(); } } } public Recorder start() { if (!started.compareAndSet(false, true)) { throw new IllegalStateException("already started"); } stopRecording = false; finishedRecording = new CountDownLatch(1); eyeData = new ArrayList<Object>(); // the RecordingRunnable created below will see the values assigned above ('happens before relationship') if (executor == null) { recordingThread = new Thread(new RecordingRunnable()); recordingThread.start(); } else { executor.execute(new RecordingRunnable()); } return this; } public Collection<Object> getData(long timeout, TimeUnit tunit) { if (started.get() == false) { throw new IllegalStateException("start first"); } if (!stopped.compareAndSet(false, true)) { throw new IllegalStateException("data already fetched"); } if (maxCollect <= 0) { stopRecording = true; } boolean recordingStopped = false; try { // this establishes a 'happens before relationship' // all updates to eyeData are now visible in this thread. recordingStopped = finishedRecording.await(timeout, tunit); } catch(InterruptedException e) { throw new RuntimeException("interrupted", e); } finally { stopRecording = true; } // if recording did not stop, do not return the eyeData (could stil be modified by recording-runnable). if (!recordingStopped) { throw new RuntimeException("recording"); } // only when everything is OK this recorder instance can be re-used started.set(false); stopped.set(false); return eyeData; } public static class EyeTracker { public static Object getData() { try { Thread.sleep(1); } catch (Exception ignored) {} return new Object(); } } public static void main(String[] args) { System.out.println("Starting."); ExecutorService exe = Executors.newSingleThreadExecutor(); try { Recorder r = new Recorder(exe).maxCollect(50).start(); int dsize = r.getData(2000, TimeUnit.MILLISECONDS).size(); System.out.println("Collected " + dsize); r.maxCollect(100).start(); dsize = r.getData(2000, TimeUnit.MILLISECONDS).size(); System.out.println("Collected " + dsize); r.maxCollect(0).start(); Thread.sleep(100); dsize = r.getData(2000, TimeUnit.MILLISECONDS).size(); System.out.println("Collected " + dsize); } catch (Exception e) { e.printStackTrace(); } finally { exe.shutdownNow(); System.out.println("Done."); } } } 

Happy coding :)

Sign up to request clarification or add additional context in comments.

2 Comments

is this faster than cyclicbarrier?
@huseyintugrulbuyukisik not faster or slower I think, but I found CountDownLatch easier to understand as CyclicBarrier. CyclicBarrier is a good tool to use as well once you get the hang of it.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.