0

I have a requirement to perform clean insert (delete + insert), a huge number of records (close to 100K) per requests. For sake testing purpose, I'm testing my code with 10K. With 10K also, the operation is running for 30 secs, which is not acceptable. I'm doing some level of batch inserts provided by spring-data-JPA. However, the results are not satisfactory.

My code looks like below

@Transactional public void saveAll(HttpServletRequest httpRequest){ List<Person> persons = new ArrayList<>(); try(ServletInputStream sis = httpRequest.getInputStream()){ deletePersons(); //deletes all persons based on some criteria while((Person p = nextPerson(sis)) != null){ persons.add(p); if(persons.size() % 2000 == 0){ savePersons(persons); //uses Spring repository to perform saveAll() and flush() persons.clear(); } } savePersons(persons); //uses Spring repository to perform saveAll() and flush() persons.clear(); } } @Transactional public void savePersons(List<Persons> persons){ System.out.println(new Date()+" Before save"); repository.saveAll(persons); repository.flush(); System.out.println(new Date()+" After save"); } 

I have also set below properties

spring.jpa.properties.hibernate.jdbc.batch_size=40 spring.jpa.properties.hibernate.order_inserts=true spring.jpa.properties.hibernate.order_updates=true spring.jpa.properties.hibernate.jdbc.batch_versioned_data=true spring.jpa.properties.hibernate.id.new_generator_mappings=false 

Looking at logs, I noticed that the insert operation is taking around 3 - 4 secs to save 2000 records, but not much on iteration. So I believe the time taken to read through the stream is not a bottleneck. But the inserts are. I also checked the logs and confirm that Spring is doing a batch of 40 inserts as per the property set.

I'm trying to see, if there is a way, I can improve the performance, by using multiple threads (say 2 threads) that would read from a blocking queue, and once accumulated say 2000 records, will call save. I hope, in theory, this may provide better results. But the problem is as I read, Spring manages Transactions at the thread level, and Transaction can not propagate across threads. But I need the whole operation (delete + insert) as atomic. I looked into few posts about Spring transaction management and could not get into the correct direction.

Is there a way I can achieve this kind of parallelism using Spring transactions? If Spring transactions is not the answer, are there any other techniques that can be used?

Thanks

1 Answer 1

1

Unsure if this will be helpful to you - it is working well in a test app. Also, do not know if it will be in the "good graces" of senior Spring personnel but my hope is to learn so I am posting this suggestion.

In a Spring Boot test app, the following injects a JPA repository into the ApplicationRunner which then injects the same into Runnables managed by an ExecutorService. Each Runnable gets a BlockingQueue that is being continually filled by a separate KafkaConsumer (which is acting like a producer for the queue). The Runnables use queue.takes() to pop from the queue and this is followed by a repo.save(). (Can readily add batch insert to thread but haven't done so since application has not yet required it...)

The test app currently implements JPA for Postgres (or Timescale) DB and is running 10 threads with 10 queues being fed by 10 Consumers.

JPA repository is provide by

public interface DataRepository extends JpaRepository<DataRecord, Long> { } 

Spring Boot Main program is

@SpringBootApplication @EntityScan(basePackages = "com.xyz.model") public class DataApplication { private final String[] topics = { "x0", "x1", "x2", "x3", "x4", "x5","x6", "x7", "x8","x9" }; ExecutorService executor = Executors.newFixedThreadPool(topics.length); public static void main(String[] args) { SpringApplication.run(DataApplication.class, args); } @Bean ApplicationRunner init(DataRepository dataRepository) { return args -> { for (String topic : topics) { BlockingQueue<DataRecord> queue = new ArrayBlockingQueue<>(1024); JKafkaConsumer consumer = new JKafkaConsumer(topic, queue); consumer.start(); JMessageConsumer messageConsumer = new JMessageConsumer(dataRepository, queue); executor.submit(messageConsumer); } executor.shutdown(); }; } } 

And the Consumer Runnable has a constructor and run() method as follows:

public JMessageConsumer(DataRepository dataRepository, BlockingQueue<DataRecord> queue) { this.queue = queue; this.dataRepository = dataRepository; } @Override public void run() { running.set(true); while (running.get()) { // remove record from FIFO blocking queue DataRecord dataRecord; try { dataRecord = queue.take(); } catch (InterruptedException e) { logger.error("queue exception: " + e.getMessage()); continue; } // write to database dataRepository.save(dataRecord); } } 

Into learning so any thoughts/concerns/feedback is appreciated...

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks. I will test your recommendation for my usecase and see if that helps. I will post my results soon.
Also, note - since my first post, I have written another test sequence for something in development. Here I have 10 threads writing to a single ArrayBlockingQueue - i.e., 10 producer threads writing to one queue. The BlockingQueue can be shared between threads without any explicit synchronization as it is part of Java Concurrency. I then have one consumer threads that pops from the queue and writes to database. Works well for my app.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.