Cache synchronization using jOOQ and PostgreSQL functions
Are you struggling with performance issues in your Spring, Jakarta EE, or Java EE application?
What if there were a tool that could automatically detect what caused performance issues in your JPA and Hibernate data access layer?
Wouldn’t it be awesome to have such a tool to watch your application and prevent performance issues during development, long before they affect production systems?
Well, Hypersistence Optimizer is that tool! And it works with Spring Boot, Spring Framework, Jakarta EE, Java EE, Quarkus, Micronaut, or Play Framework.
So, rather than fixing performance issues in your production system on a Saturday night, you are better off using Hypersistence Optimizer to help you prevent those issues so that you can spend your time on the things that you love!
Introduction
In this article, we are going to see how we can achieve cache synchronization with the help of jOOQ and PostgreSQL functions.
By using Change Data Capture, we can track how table records change over time and synchronize the application-level cache entries that were built from the table records in question.
Domain Model
Let’s assume we are building a question-and-answer website similar to Stack Overflow. The largest and the most important tables in our database are the question and answer tables, which look as follows:

Because our application has a lot of users, we want to store the most viewed questions and answers in an application-level cache like Redis.
The cache entry key is the question identifier, and the value is going to be a Question record that contains a List of Answer records, as illustrated by the following diagram:

Tracking record changes using a PostgreSQL function
To extract the Question and Answer records that need to be synchronized with the cache, we are going to use the following get_updated_questions_and_answers PostgreSQL function:
CREATE OR REPLACE FUNCTION get_updated_questions_and_answers() RETURNS TABLE( question_id bigint, question_title varchar(250), question_body text, question_score integer, question_created_on timestamp, question_updated_on timestamp, answer_id bigint, answer_body text, answer_accepted boolean, answer_score integer, answer_created_on timestamp, answer_updated_on timestamp ) LANGUAGE plpgsql AS $$ DECLARE previous_snapshot_timestamp timestamp; max_snapshot_timestamp timestamp; result_set_record record; BEGIN previous_snapshot_timestamp = ( SELECT updated_on FROM cache_snapshot WHERE region = 'QA' FOR NO KEY UPDATE ); IF previous_snapshot_timestamp is null THEN INSERT INTO cache_snapshot( region, updated_on ) VALUES ( 'QA', to_timestamp(0) ); previous_snapshot_timestamp = to_timestamp(0); END IF; max_snapshot_timestamp = to_timestamp(0); FOR result_set_record IN( SELECT q1.id as question_id, q1.title as question_title, q1.body as question_body,q1.score as question_score, q1.created_on as question_created_on, q1.updated_on as question_updated_on, a1.id as answer_id, a1.body as answer_body, a1.accepted as answer_accepted, a1.score as answer_score, a1.created_on as answer_created_on, a1.updated_on as answer_updated_on FROM question q1 LEFT JOIN answer a1 on q1.id = a1.question_id WHERE q1.id IN ( SELECT q2.id FROM question q2 WHERE q2.updated_on > previous_snapshot_timestamp ) OR q1.id IN ( SELECT a2.question_id FROM answer a2 WHERE a2.updated_on > previous_snapshot_timestamp ) ORDER BY question_created_on, answer_created_on ) loop IF result_set_record.question_updated_on > max_snapshot_timestamp THEN max_snapshot_timestamp = result_set_record.question_updated_on; END IF; IF result_set_record.answer_updated_on > max_snapshot_timestamp THEN max_snapshot_timestamp = result_set_record.answer_updated_on; END IF; question_id = result_set_record.question_id; question_title = result_set_record.question_title; question_body = result_set_record.question_body; question_score = result_set_record.question_score; question_created_on = result_set_record.question_created_on; question_updated_on = result_set_record.question_updated_on; answer_id = result_set_record.answer_id; answer_body = result_set_record.answer_body; answer_accepted = result_set_record.answer_accepted; answer_score = result_set_record.answer_score; answer_created_on = result_set_record.answer_created_on; answer_updated_on = result_set_record.answer_updated_on; RETURN next; END loop; UPDATE cache_snapshot SET updated_on = max_snapshot_timestamp WHERE region = 'QA'; END $$
The get_updated_questions_and_answers function works as follows:
- First, it checks the
previous_snapshot_timestamp, which tracks what was the most recentquestionoranswerthat we previously synchronized with the cache. - Second, we fetch the
questionalong with all theiranswerrecords if there was any modification that happened inside this question and answer hierarchy - Afterward, we iterate over the
questionandanswerrecords and calculate themax_snapshot_timestamp, which will become the nextprevious_snapshot_timestampthe next time we call theget_updated_questions_and_answersfunction.
If the
get_updated_questions_and_answersfunction is called from a@Transactionalcontext that executes the cache update, then in case of a cache update failure the transaction will be rolled back, and thecache_snapshottable is reverted to its previous consistent state.
Calling the TABLE-value function using jOOQ
As I explained in this article, jOOQ provides the best way to call database stored procedures and functions from Java.
By using the code generator, jOOQ creates a GetUpdatedQuestionsAndAnswers utility that allows us to call the get_updated_questions_and_answers PostgreSQL function.
First, we will import the static variables declared by the GetUpdatedQuestionsAndAnswers utility:
import static com.vladmihalcea.book.hpjp.jooq.pgsql.schema.crud.tables .GetUpdatedQuestionsAndAnswers.GET_UPDATED_QUESTIONS_AND_ANSWERS;
Afterward, we can call the get_updated_questions_and_answers PostgreSQL function like this:
Result<GetUpdatedQuestionsAndAnswersRecord> records = sql .selectFrom(GET_UPDATED_QUESTIONS_AND_ANSWERS.call()) .fetch();
The GetUpdatedQuestionsAndAnswersRecord contains the type-safe TABLE result set that is returned by the get_updated_questions_and_answers PostgreSQL function.
From the GetUpdatedQuestionsAndAnswersRecord, we can create the Question and Answer hierarchy to be stored in the cache.
This can be encapsulated in the getUpdatedQuestionsAndAnswers method using a custom Java Collector:
public List<Question> getUpdatedQuestionsAndAnswers() { return doInJOOQ(sql -> { return sql .selectFrom(GET_UPDATED_QUESTIONS_AND_ANSWERS.call()) .collect( Collectors.collectingAndThen( Collectors.toMap( GetUpdatedQuestionsAndAnswersRecord::getQuestionId, record -> { Question question = new Question( record.getQuestionId(), record.getQuestionTitle(), record.getQuestionBody(), record.getQuestionScore(), record.getQuestionCreatedOn(), record.getQuestionUpdatedOn(), new ArrayList<>() ); Long answerId = record.getAnswerId(); if (answerId != null) { question.answers().add( new Answer( answerId, record.getAnswerBody(), record.getAnswerScore(), record.getAnswerAccepted(), record.getAnswerCreatedOn(), record.getAnswerUpdatedOn() ) ); } return question; }, (Question existing, Question replacement) -> { existing.answers().addAll( replacement.answers() ); return existing; }, LinkedHashMap::new ), (Function<Map<Long, Question>, List<Question>>) map -> new ArrayList<>(map.values()) ) ); }); } Testing time
When inserting a parent question row with two associated answer child records:
LocalDateTime timestamp = LocalDateTime.now().minusSeconds(1); sql .insertInto(QUESTION) .columns( QUESTION.ID, QUESTION.TITLE, QUESTION.BODY, QUESTION.SCORE, QUESTION.CREATED_ON, QUESTION.CREATED_ON ) .values( 1L, "How to call jOOQ stored procedures?", "I have a PostgreSQL stored procedure and I'd like to call it from jOOQ.", 1, timestamp, timestamp ) .execute(); sql .insertInto(ANSWER) .columns( ANSWER.ID, ANSWER.QUESTION_ID, ANSWER.BODY, ANSWER.SCORE, ANSWER.ACCEPTED, ANSWER.CREATED_ON, ANSWER.CREATED_ON ) .values( 1L, 1L, "Checkout the [jOOQ docs]" + "(https://www.jooq.org/doc/latest/manual/sql-execution/stored-procedures/).", 10, true, timestamp, timestamp ) .values( 2L, 1L, "Checkout [this article]" + "(https://vladmihalcea.com/jooq-facts-sql-functions-made-easy/).", 5, false, timestamp, timestamp ) .execute();
We can see that the getUpdatedQuestionsAndAnswers method returns one Question with two Answer entries that match exactly the Question hierarchy we have just created:
List<Question> questions = getUpdatedQuestionsAndAnswers(); assertEquals(1, questions.size()); Question question = questions.get(0); assertEquals(1, question.id().intValue()); List<Answer> answers = question.answers(); assertEquals(2, answers.size()); assertEquals(1, answers.get(0).id().intValue()); assertEquals(2, answers.get(1).id().intValue());
When inserting a new Answer into our hierarchy:
sql .insertInto(ANSWER) .columns( ANSWER.ID, ANSWER.QUESTION_ID, ANSWER.BODY ) .values( 3L, 1L, "Checkout this [video from Toon Koppelaars]" + "(https://www.youtube.com/watch?v=8jiJDflpw4Y)." ) .execute();
We can see that now the Question record returned by the getUpdatedQuestionsAndAnswers method will contain three Answer child elements:
List<Question> questions = getUpdatedQuestionsAndAnswers(); assertEquals(1, questions.size()); Question question = questions.get(0); assertEquals(1, question.id().intValue()); List<Answer> answers = question.answers(); assertEquals(3, answers.size()); assertEquals(1, answers.get(0).id().intValue()); assertEquals(2, answers.get(1).id().intValue()); assertEquals(3, answers.get(2).id().intValue());
When updating the answer table row that we have just created:
sql .update(ANSWER) .set( ANSWER.BODY, "Checkout this [YouTube video from Toon Koppelaars]" + "(https://www.youtube.com/watch?v=8jiJDflpw4Y)." ) .where(ANSWER.ID.eq(3L)) .execute();
The getUpdatedQuestionsAndAnswers method will return the updated snapshot of our Question and Answer hierarchy:
List<Question> questions = getUpdatedQuestionsAndAnswers(); assertEquals(1, questions.size()); Question question = questions.get(0); assertEquals(1, question.id().intValue()); List<Answer> answers = question.answers(); assertEquals(3, answers.size()); assertEquals(1, answers.get(0).id().intValue()); assertEquals(2, answers.get(1).id().intValue()); Answer latestAnswer = answers.get(2); assertEquals(3, latestAnswer.id().intValue()); assertEquals( "Checkout this [YouTube video from Toon Koppelaars]" + "(https://www.youtube.com/watch?v=8jiJDflpw4Y).", latestAnswer.body() );
If we decide to insert a new Question:
sql .insertInto(QUESTION) .columns( QUESTION.ID, QUESTION.TITLE, QUESTION.BODY ) .values( 2L, "How to use the jOOQ MULTISET operator?", "I want to know how I can use the jOOQ MULTISET operator." ) .execute();
The getUpdatedQuestionsAndAnswers method will capture this change and return the newly created Question that we can store in the cache:
List<Question> questions = getUpdatedQuestionsAndAnswers(); assertEquals(1, questions.size()); Question question = questions.get(0); assertEquals(2, question.id().intValue()); assertTrue(question.answers().isEmpty());
Cool, right?
If you enjoyed this article, I bet you are going to love my Book and Video Courses as well.
Conclusion
While caching data is easy, synchronizing the cache with the database is the difficult part.
By using jOOQ to call the PostgreSQL TABLE-value functions that fetch the cacheable aggregates, we can simplify this task, as the result will capture the entries that have been changed since the last time we executed the cache synchronization.
This research was funded by Data Geekery GmbH and conducted in accordance with the blog ethics policy.
While the article was written independently and reflects entirely my opinions and conclusions, the amount of work involved in making this article happen was compensated by Data Geekery.



Ok, makes perfect sense now. Thanks!
You’re welcome.
Very interesting! Single question that is implementation specific – why did you subtract one second from the ‘timestamp’?
I’m glad you liked it.
The reason why the INSERT statement uses a TIMESTAMP that’s one second early is that we want to emulate the case when something has been created before we actually call the database function.