2

I have around 400k employee ids .I can pass 1 employee id at a time.

How can I call the PI in parallel using Threads so I can improve the performance. Need some pointers.

saveInDatabase() methods saves the object in a database table.I have marked this method as Synchronized

private void callApi(List<Long> employeeList, HttpEntity<String> requestEntity) { Long employeeId; for (Long i : employeeList) { employeeId = i;// url String url = "http://dummy.restapiexample.com/api/v1/employee/" + employeeId; ResponseEntity<String> responseEntity = restTemplate.exchange(url, HttpMethod.GET, requestEntity, String.class); saveInDatabase(responseEntity); } } 

2 Answers 2

3

Using directly the Thread API is error prone because low level.
Using parallelStream() may be interesting but it may also be an issue because the processed stream could consume all CPU core available for your application.
Which means that other HTTP client requests of your application could be served very lately.
And note also that the number of threads used in parallelStream() is a JVM implementation detail and doesn't make part of the public API.
The ExecutorService API that allows to specify a number of threads available in the pool looks a better/more robust alternative.

Spring Boot provides a built-in feature that wraps it.
You could extract the individual task to invoke into a method such as :

@Async public Future<ResponseEntity<String>> getEmployee(long employeeId, HttpEntity<String> requestEntity) { String url = "http://dummy.restapiexample.com/api/v1/employee/" + employeeId; ResponseEntity<String> responseEntity = restTemplate.exchange(url, HttpMethod.GET, requestEntity, String.class);    return new AsyncResult<ResponseEntity<String>>(responseEntity); } 

Now call it :

private void callApi(List<Long> employeeList, HttpEntity<String> requestEntity) { // Do async calls and store futures in a List List<Future<ResponseEntity<String>>> futures = new ArrayList<>(); for (Long id : employeeList) { futures.add(getEmployee(id, requestEntity)); } // Then process list of future for (Future<ResponseEntity<String>> future : futures) try{ saveInDatabase(future.get()); } catch(Exception e){ //handle the exception } } } 

As a side note, doing the saveInDatabase() operation into a loop is not a right approach.
Instead of, you want to batch the database insertions because you have many of them to do. Something like :

private void callApi(List<Long> employeeList, HttpEntity<String> requestEntity) { List<ResponseEntity<String>> responseEntities = employeeList.stream() .map(id -> getEmploye(id)) .map(future -> { try{return future.get();} catch(Exception e){ //handle the exception } } ) .collect(toList()); saveInDatabase(responseEntities); } 

To make the @Asynch feature working, you have to add @EnableAsync on a @Configuration class of your application.
Optionally you can define a Executor bean with the pool/queue configuration that suit to your need. Beware : if you don't define an Executor bean, Spring will create a SimpleAsyncTaskExecutor and use that (it creates a Thread by task and don't reuse them).

For example :

@SpringBootApplication @EnableAsync public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(500); executor.setThreadNamePrefix("GithubLookup-"); executor.initialize(); return executor; } } 
Sign up to request clarification or add additional context in comments.

15 Comments

Thanks for suggestion but where are you imp[lamenting Callable?
in between your getEmployee() throwing compile error due to return type mismatch
You don't need to. The code that you declare in the method annotated @Async is your Callable. The Spring framework add the glue and transmit it to the ExecutorService.
@Raj R I didn't write the code in an IDE but on the fly. Some typos may be present. Don't hesitate to solve them and to update my post with. I updated about the return.
future.get() is blocking.
|
1

You can use parallelStream() to make API calls concurrently

List<ResponseEntity<String>> result = employeeList.parallelStream() .map(id->restTemplate.exchange("http://dummy.restapiexample.com/api/v1/employee/"+id, HttpMethod.GET, requestEntity, String.class)) .collect(Collectors.toList()); result.forEach(entity->saveInDatabase(entity)); 

But beware parallelStream() may also starve the CPU cores available for your application. If the application doesn't make only this task but is designed to serve other requests, it could be an issue.

So as suggested by @davidxxx use saveAll for batch insert

saveAll(result) 

9 Comments

can I use thread and do I have to mark saveInDatabase(entity as sunchrozied using parallet stream too?
while using parallelStream each task is executed asynchronously(depends upon CPU), i believe no need of any synchronization for saving data into database @RajR
will this reduce time?currently my batch is taking 3 hours to do this
definitely it will reduce time, take a look at this @RajR How many cores do you have?
Parallel stream is not necessarily bad but a warning is needed. I added it.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.