I'm trying to unit test an advice on the poller which blocks execution of the mongo channel adapter until a certain condition is met (=all messages from this batch are processed).
The flow looks as follow:
IntegrationFlows.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, new Query().with(Sort.by(Sort.Direction.DESC, "modifiedDate")).limit(1)) .collectionName("metadata") .entityClass(Metadata.class) .expectSingleResult(true), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(pollingIntervalSeconds)) .advice(this.advices.waitUntilCompletedAdvice()))) .handle((p, h) -> { this.advices.waitUntilCompletedAdvice().setWait(true); return p; }) .handle(doSomething()) .channel(Channels.DOCUMENT_HEADER.name()) .get(); And the following advice bean:
@Bean public WaitUntilCompletedAdvice waitUntilCompletedAdvice() { DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(Duration.ofSeconds(1)); return new WaitUntilCompletedAdvice(trigger); } And the advice itself:
public class WaitUntilCompletedAdvice extends SimpleActiveIdleMessageSourceAdvice { AtomicBoolean wait = new AtomicBoolean(false); public WaitUntilCompletedAdvice(DynamicPeriodicTrigger trigger) { super(trigger); } @Override public boolean beforeReceive(MessageSource<?> source) { if (getWait()) return false; return true; } public boolean getWait() { return wait.get(); } public void setWait(boolean newWait) { if (getWait() == newWait) return; while (true) { if (wait.compareAndSet(!newWait, newWait)) { return; } } } } I'm using the following test for testing the flow:
@Test public void testClaimPoollingAdapterFlow() throws Exception { // given ArgumentCaptor<Message<?>> captor = messageArgumentCaptor(); CountDownLatch receiveLatch = new CountDownLatch(1); MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown()); this.mockIntegrationContext.substituteMessageHandlerFor("retrieveDocumentHeader", mockMessageHandler); LocalDateTime modifiedDate = LocalDateTime.now(); ProcessingMetadata data = Metadata.builder() .modifiedDate(modifiedDate) .build(); assert !this.advices.waitUntilCompletedAdvice().getWait(); // when itf.getInputChannel().send(new GenericMessage<>(Mono.just(data))); // then assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue(); verify(mockMessageHandler).handleMessage(any()); assertThat(captor.getValue().getPayload()).isEqualTo(modifiedDate); assert this.advices.waitUntilCompletedAdvice().getWait(); } Which works fine but when I send another message to the input channel, it still processes the message without respecting the advice.
Is it intended behaviour? If so, how can I verify using unit test that the poller is really waiting for this advice?