2

We are using java 8 parallel stream to process a task, and we are submitting the task through ForkJoinPool#submit. We are not using jvm wide ForkJoinPool.commonPool, instead we are creating our own custom pool to specify the parallelism and storing it as static variable.

We have validation framework, where we subject a list of tables to a List of Validators, and we submit this job through the custom ForkJoinPool as follows:

static ForkJoinPool forkJoinPool = new ForkJoinPool(4);

List<Table> tables = tableDAO.findAll(); ModelValidator<Table, ValidationResult> validator = ValidatorFactory .getInstance().getTableValidator(); List<ValidationResult> result = forkJoinPool.submit( () -> tables.stream() .parallel() .map(validator) .filter(result -> result.getValidationMessages().size() > 0) .collect(Collectors.toList())).get(); 

The problem we are having is, in the downstream components, the individual validators which run on separate threads from our static ForkJoinPool rely on tenant_id, which is different for every request and is stored in an InheritableThreadLocal variable. Since we are creating a static ForkJoinPool, the threads pooled by the ForkJoinPool will only inherit the value of the parent thread, when it is created first time. But these pooled threads will not know the new tenant_id for the current request. So for subsequent execution these pooled threads are using old tenant_id.

I tried creating a custom ForkJoinPool and specifying ForkJoinWorkerThreadFactory in the constructor and overriding the onStart method to feed the new tenant_id. But that doesnt work, since the onStart method is called only once at creation time and not during individual execution time.

Seems like we need something like the ThreadPoolExecutor#beforeExecute which is not available in case of ForkJoinPool. So what alternative do we have if we want to pass the current thread local value to the statically pooled threads?

One workaround would be to create the ForkJoinPool for each request, rather than make it static but we wouldn't want to do it, to avoid the expensive nature of thread creation.

What alternatives do we have?

2 Answers 2

3

I found the following solution that works without changing any underlying code. Basically, the map method takes a functional interface which I am representing as a lambda expression. This expression adds a preExecution hook to set the new tenantId in the current ThreadLocal and cleaning it up in postExecution.

forkJoinPool.submit(tables.stream() .parallel() .map((item) -> { preExecution(tenantId); try { return validator.apply(item); } finally { postExecution(); } } ) .filter(validationResult -> validationResult.getValidationMessages() .size() > 0) .collect(Collectors.toList())).get(); 
Sign up to request clarification or add additional context in comments.

Comments

0

The best option in my view would be to get rid of the thread local and pass it as an argument instead. I understand that this could be a massive undertaking though. Another option would be to use a wrapper.

Assuming that your validator has a validate method you could do something like:

public class WrappingModelValidator implements ModelValidator<Table. ValidationResult> { private final ModelValidator<Table. ValidationResult> v; private final String tenantId; public WrappingModelValidator(ModelValidator<Table. ValidationResult> v, String tenantId) { this.v = v; this.tenantId = tenantId; } public ValidationResult validate(Table t) { String oldValue = YourThreadLocal.get(); YourThreadLocal.set(tenantId); try { return v.validate(t); } finally { YourThreadLocal.set(oldValue); } } } 

Then you simply wrap your old validator and it will set the thread local on entry and restore it when done.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.