0

I'm trying to unit test an advice on the poller which blocks execution of the mongo channel adapter until a certain condition is met (=all messages from this batch are processed).

The flow looks as follow:

IntegrationFlows.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, new Query().with(Sort.by(Sort.Direction.DESC, "modifiedDate")).limit(1)) .collectionName("metadata") .entityClass(Metadata.class) .expectSingleResult(true), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(pollingIntervalSeconds)) .advice(this.advices.waitUntilCompletedAdvice()))) .handle((p, h) -> { this.advices.waitUntilCompletedAdvice().setWait(true); return p; }) .handle(doSomething()) .channel(Channels.DOCUMENT_HEADER.name()) .get(); 

And the following advice bean:

@Bean public WaitUntilCompletedAdvice waitUntilCompletedAdvice() { DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(Duration.ofSeconds(1)); return new WaitUntilCompletedAdvice(trigger); } 

And the advice itself:

public class WaitUntilCompletedAdvice extends SimpleActiveIdleMessageSourceAdvice { AtomicBoolean wait = new AtomicBoolean(false); public WaitUntilCompletedAdvice(DynamicPeriodicTrigger trigger) { super(trigger); } @Override public boolean beforeReceive(MessageSource<?> source) { if (getWait()) return false; return true; } public boolean getWait() { return wait.get(); } public void setWait(boolean newWait) { if (getWait() == newWait) return; while (true) { if (wait.compareAndSet(!newWait, newWait)) { return; } } } } 

I'm using the following test for testing the flow:

 @Test public void testClaimPoollingAdapterFlow() throws Exception { // given ArgumentCaptor<Message<?>> captor = messageArgumentCaptor(); CountDownLatch receiveLatch = new CountDownLatch(1); MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown()); this.mockIntegrationContext.substituteMessageHandlerFor("retrieveDocumentHeader", mockMessageHandler); LocalDateTime modifiedDate = LocalDateTime.now(); ProcessingMetadata data = Metadata.builder() .modifiedDate(modifiedDate) .build(); assert !this.advices.waitUntilCompletedAdvice().getWait(); // when itf.getInputChannel().send(new GenericMessage<>(Mono.just(data))); // then assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue(); verify(mockMessageHandler).handleMessage(any()); assertThat(captor.getValue().getPayload()).isEqualTo(modifiedDate); assert this.advices.waitUntilCompletedAdvice().getWait(); } 

Which works fine but when I send another message to the input channel, it still processes the message without respecting the advice.

Is it intended behaviour? If so, how can I verify using unit test that the poller is really waiting for this advice?

2 Answers 2

2

itf.getInputChannel().send(new GenericMessage<>(Mono.just(data)));

That bypasses the poller and sends the message directly.

You can unit test the advice has been configured by calling beforeReceive() from your test

Or you can create a dummy test flow with the same advice

IntegationFlows.from(() -> "foo", e -> e.poller(...)) ... 

And verify that just one message is sent.

Sign up to request clarification or add additional context in comments.

3 Comments

thanks Gary! I like the idea of separate dummy test but how do I send a message to that flow other than getInputChannel().send?
It's not clear what you mean - that dummy flow will send a message with payload "foo" on each poll, unless the advice cancels the poll. If the downstream flow calls the advice to stop it, you will only get one message.
thanks, I think I got it now. I'll test it and come back to you.
0

Example implementation:

@Test public void testWaitingActivate() { // given this.advices.waitUntilCompletedAdvice().setWait(true); // when Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000); // then assertThat(receive).isNull(); } @Test public void testWaitingInactive() { // given this.advices.waitUntilCompletedAdvice().setWait(false); // when Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000); // then assertThat(receive).isNotNull(); } @Configuration @EnableIntegration public static class Config { @Autowired private Advices advices; @Bean public PollableChannel testChannel() { return new QueueChannel(); } @Bean public IntegrationFlow fakeFlow() { this.advices.waitUntilCompletedAdvice().setWait(true); return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(1)) .advice(this.advices.waitUntilCompletedAdvice()))).channel("testChannel").get(); } } 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.