3

There is one job which is running in a single threaded environment. This means I have main method and the main thread is responsible for completing the job.

I am using Spring and Hibernate.

On the high level I'm doing the following steps:

  1. fetching data from a MySQL database using JDBC ( looping resultset and doing point 2. and 3. )

  2. populating a Model using the data received from point 1.

  3. validation, calling service layer, dao layer and storing entity in oracle db.

This flow is using a for loop. So 1 by 1 data insertion is there.

Now I want to do it in using multi threading.

Approach :

  1. one thread will fetch the data and populate model object and put it in the queue.

  2. multiple threads will dequeue object from queue and start point 3.

Can you help me in implementing this model. How to code this type of multithreading framework.

2
  • can use storm topologies, can set different degree of parallelism(N executors) for bolts to do fast processing. It support distributed computing/fault tolerant/guaranteed message processing as well. very easy to implement. existing java code will be reuse there. Commented Feb 7, 2015 at 10:25
  • What about Spring thread pool executors for each logical steps ? Commented Feb 7, 2015 at 10:34

2 Answers 2

6

You can do it like this:

  1. Define an ExecutorService:

     ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); 
  2. Each for iteration should simply submit a new batch of objects to be inserted:

     final List<RecordDTO> records = ...; executorService.submit(() -> insertService.save(records); ); 
  3. The insertService will have a @Transactional save method.

  4. The connection pool size should be greater or equal to the number of workers.

Instead of sending just one entity to the worker thread, it's more efficient to send a batch of several List<RecordDTO> so that they are all inserted using a single database call. For that, you'd need to enable Hibernate JDBC batch inserts.

Sign up to request clarification or add additional context in comments.

5 Comments

before insertion, I have to validate and populate something on model object. And that too I want to have it multithreaded. so inside run I have to call processAcc(model) and processAcc(model) internally call .save. So here processAcc(model) my model cannot be final..now what to do
Do the validation phase in a synchronized block in the processing phase, because of the concurrent environment.
@Vlad- is it okay to submit() the insertService#save for execution in another thread and at the same time have the @Transactional over that functionality? I myself avoid scenarios where I would execute the @Transactional-annotated method in thread different to one where I got the service from Spring (injected it). I mean that there are scenarios where the @Transactional functionality is kind of ThreadLocal construct and this might not work. I would in this case probably submit() a factory method to retrieve the insertService bean in the executor thread. What do you think?
@Michal The @Transactional annotation is on the service method, so the transaction will execute the batch insert in its own transaction. It doesn't matter whether the main thread is transactional or not since the worker thread will define its own transaction and the entities you are passing will either be New or Detached, in which case persist or merge is called on them from the other transaction.
@Vlad - thanks for answering that to me. I re-read on the subject and now my understanding is that those ThreadLocal-ised data are created first when the @Transactional method execution starts (and not at the bean creation time, as I assumed before) and therefore the above-described scenario shall work. In another words, those ThreadLocal data are bound to the execution thread, not to bean creation thread. Thank you for clarifying that to me, that is a useful scenario I was no aware of )
0

you need a single-producer multiple-consumer thread-safe queue. Take a look at LMAX disruptor, that's the best suit for you.

LMAX Disruptor is a High Performance Inter-Thread Messaging Library and it's the default messaging system for Intra-worker communication in Apache Storm The underlying data structure is a lock-free ring buffer. To make it fast, it use a lot of tricks to reduce false sharing

follow the get started tutorial, there is a very simple example you could refer to.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.