1

How to deal with back-pressure in Java using thread pool?

How to reject new tasks so there are no more than N submitted tasks. N - is the maximum number of allowed tasks in submission queue, which include new, running, paused (not finished) tasks.

Use case

Users submit calculation tasks that run for some time. Sometimes, there are so many users submitting tasks at the same time. How to reject new tasks if there are already N tasks submitted.

In other words, the total number of submitted (not finished, started or not started) tasks cannot be greater than N.

Example code

Here is full version and bellow are short snippets.

A long running task. CalculationTask.

public class CalculationTask { public CalculationTask(final String name) { this.name = name; } public CalculationResult calculate() { final long waitTimeMs = MIN_WAIT_TIME_MS + RANDOM.nextInt(MAX_WAIT_TIME_MS); sleep(waitTimeMs); final int result = Math.abs(RANDOM.nextInt()); final String text = "This is result: " + result; final CalculationResult calculationResult = new CalculationResult(name, text, result); System.out.println("Calculation finished: " + calculationResult); return calculationResult; } } 

Its result. CalculationResult.

public class CalculationResult { private final String taskName; private final String text; private final Integer number; // Getters, setters, constructor, toString. } 

This is how I submit jobs. CalculationBroker.

public class CalculationBroker { private static final int MAX_WORKERS_NUMBER = 5; private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS_NUMBER); private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>(); public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) { final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName()); if (calculationResultCached != null) { return CompletableFuture.completedFuture(calculationResultCached); } System.out.println("Calculation submitted: " + calculationTask.getName()); final CompletableFuture<CalculationResult> calculated = CompletableFuture .supplyAsync(calculationTask::calculate, executorService); calculated.thenAccept(this::updateCache); return calculated; } private void updateCache(final CalculationResult calculationResult) { calculationCache.put(calculationResult.getTaskName(), calculationResult); } } 

And this is how I run them together. Main.

public class Main { public static void main(String[] args) { final int N_TASKS = 100; final CalculationBroker calculationBroker = new CalculationBroker(); final List<CompletableFuture<CalculationResult>> completableFutures = new ArrayList<>(); for (int i = 0; i < N_TASKS; i++) { final CalculationTask calculationTask = createCalculationTask(i); final CompletableFuture<CalculationResult> calculationResultCompletableFuture = calculationBroker.submit(calculationTask); completableFutures.add(calculationResultCompletableFuture); } calculationBroker.close(); } private static CalculationTask createCalculationTask(final int counter) { return new CalculationTask("CalculationTask_" + counter); } } 

This is output.

2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_97. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_98. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_99. 2020-05-23 14:14:54 [pool-1-thread-3] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1081871544', number=1081871544, durationMs=1066} 2020-05-23 14:14:55 [pool-1-thread-1] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 1942553785', number=1942553785, durationMs=1885} 2020-05-23 14:14:56 [pool-1-thread-5] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_4', text='This is result: 104326011', number=104326011, durationMs=2120} 20 

My findings.

Bellow details

Code above is equivalent to Executors.newFixedThreadPool(n), however instead of default unlimited LinkedBlockingQueue we use ArrayBlockingQueue with fixed capacity of 100. This means that if 100 tasks are already queued (and n being executed), new task will be rejected with RejectedExecutionException.

ThreadPoolExecutor uses a LinkedBlockingQueue, which is unlimited by default.

As the post above sugessts:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue); 
4
  • 1
    Just to make things clear .. you don't want to be able to submit or you want your threads to avoid start processing the new submitted calculations until you have free thread ?? N.B: Both are possible to achieve just to be able to provide the right solution. Commented May 23, 2020 at 11:42
  • 1
    Good point, thanks, I updated my question. I want to reject new tasks, so total number of submitted tasks (in the queue under the hood) is less or equal than N. Commented May 23, 2020 at 12:04
  • 1
    to employ backpressure, use reactive streams protocol. Commented May 23, 2020 at 22:24
  • It would be nice to see an example, @AlexeiKaigorodov Commented May 24, 2020 at 8:00

2 Answers 2

2

You answered your own question ... you could use Queue size to do that..

int poolSize = ...; int queueSize = ...; CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler(); ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queueSize), handler); 

You could use CustomRejectedExecutionHandler to handle rejected threads.

import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { public static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class); @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { LOGGER.error(runnable.toString() + " execution rejected."); } } 
Sign up to request clarification or add additional context in comments.

3 Comments

Makes sense. I did not see RejectedExecutionHandler .
There is one drawback though. In my use-case, if a task gets rejected (it happens with a small number of queueSize, there is not exception thrown, so there is no warning or error in the logs). How does a client know that their task got rejected?
Updated my answer so you could handle rejected threads.. Please note that when you have heave calculation the threadPoolSize must be max = available processors or the cpu will randomly start processing part of thread1 .. stop it ... start with thread 5 ... stop it then back to thread 1 and continue execution ... (this scenario if 4 processors) same applied for less or more available processors in cpu. in the other hand, if your thread calling DB or external API's there is no issue with creating 100 in parallel al let executionService handle how they run
1

Thanks Hussein and also this answer and documentation. It got it this way.

You can check full sources.

 private final ExecutorService executorService = initializeThreadPoolWithRejection(); private ExecutorService initializeThreadPoolWithRejection() { final RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); return new ThreadPoolExecutor(WORKERS_NUMBER, MAX_WORKERS_NUMBER, 0L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(10 /*queueSize*/), handler); } 

Note, I use ThreadPoolExecutor.AbortPolicy(); because it fails by default with ExecutionException exception.

CalculationBroker

 public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) { final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName()); if (calculationResultCached != null) { return CompletableFuture.completedFuture(calculationResultCached); } LOGGER.info("Calculation submitted: {}.", calculationTask.getName()); try { final CompletableFuture<CalculationResult> calculated = CompletableFuture .supplyAsync(calculationTask::calculate, executorService); calculated.thenAccept(this::updateCache); return calculated; } catch (Exception e) { System.out.println("Failed to submit a task."); return CompletableFuture.failedFuture(e); } } 

Usage example in Main:

 private static void completeFuture(final CompletableFuture<CalculationResult> future) { final CalculationResult calculationResult; try { calculationResult = future.get(); System.out.println("Task is finished: " + calculationResult); } catch (InterruptedException e) { System.out.println("Task was interrupted. " + e.getMessage()); } catch (ExecutionException e) { System.out.println("Task failed."); } } 

It produces output:

2020-05-23 16:44:09 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_15. 2020-05-23 16:44:09 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_16. 2020-05-23 16:44:09 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_17. 2020-05-23 16:44:09 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_20. Failed to submit a task. 2020-05-23 16:44:09 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_21. Failed to submit a task. 2020-05-23 16:44:09 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_22. Failed to submit a task. 2020-05-23 16:44:11 [pool-1-thread-8] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_17', text='This is result: 1096770940', number=1096770940, durationMs=1246} 2020-05-23 16:44:11 [pool-1-thread-4] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_3', text='This is result: 2103177010', number=2103177010, durationMs=1814} 2020-05-23 16:44:12 [pool-1-thread-6] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_15', text='This is result: 961885863', number=961885863, durationMs=2632} 2 Task is finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 79356259', number=79356259, durationMs=3875} Task is finished: CalculationResult{taskName='CalculationTask_1', text='This is result: 532289460', number=532289460, durationMs=3725} Task is finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1579151336', number=1579151336, durationMs=3684} Task failed. Task failed. Task failed. 

Note, it works only in Java 9+.

CompletableFuture.failedFuture(e); does not work in Java 8.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.