I'm trying to write down a test class in order to test that a message driven channel adapter listening on a JMS Queue is forwarding the message to the right channel (ref. Advanced Spring Integration Testing). Following is the test context xml:
<!-- MockRunner configuration --> <bean id="destinationManager" class="com.mockrunner.jms.DestinationManager"/> <bean id="outgoingDestination" factory-bean="destinationManager" factory-method="createQueue"> <constructor-arg index="0" value="demoMockRunnerQueue"/> </bean> <bean id="configurationManager" class="com.mockrunner.jms.ConfigurationManager"/> <bean id="connectionFactory" class="com.mockrunner.mock.jms.MockQueueConnectionFactory"> <constructor-arg index="0" ref="destinationManager"/> <constructor-arg index="1" ref="configurationManager"/> </bean> <!-- Spring JMS Template --> <bean id="jmsTemplate" class="org.mockito.Mockito" factory-method="mock"> <constructor-arg value="org.springframework.jms.core.JmsTemplate" /> </bean> Here is the spring integration configuration with the message driver channel:
<int:channel id="inbound"/> <int-jms:message-driven-channel-adapter id="jmsIn" channel="inbound" destination="outgoingDestination" connection-factory="connectionFactory" acknowledge="transacted"/> <int:service-activator input-channel="inbound" ref="messageQueueConsumer" method="consumeMessage"/> <bean id="messageQueueConsumer" class="uk.co.example.consumer.SimpleMessageConsumer"> </bean> And following there's the java class containing the test:
@Resource JmsTemplate jmsTemplate; /** * "inbound" is the channel used to trigger the service activator (i.e. the message consumer) * */ @Resource @Qualifier("inbound") SubscribableChannel inbound; private static final Logger LOGGER = Logger.getLogger(InboundChannelFlowUnitTest.class); /** * This test verifies that a message received on a polling JMS inbound channel adapter is * routed to the designated channel and that the message payload is as expected * * @throws JMSException * @throws InterruptedException * @throws IOException */ @Test public void testReceiveMessage() throws JMSException, InterruptedException, IOException { String msg = "hello"; boolean sent = verifyJmsMessageReceivedOnChannel(msg, inbound, new CountDownHandler() { @Override protected void verifyMessage(Message<?> message) { assertEquals("hello", message.getPayload()); } } ); assertTrue("message not sent to expected output channel", sent); } /** * Provide a message via a mock JMS template and wait for the default timeout to receive the message on the expected channel * @param obj The message provided to the poller (currently must be a String) * @param expectedOutputChannel The expected output channel * @param handler An instance of CountDownHandler to handle (verify) the output message * @return true if the message was received on the expected channel * @throws JMSException * @throws InterruptedException */ protected boolean verifyJmsMessageReceivedOnChannel(Object obj, SubscribableChannel expectedOutputChannel, CountDownHandler handler) throws JMSException, InterruptedException{ return verifyJmsMessageOnOutputChannel(obj, expectedOutputChannel, handler, 2000); } /** * Provide a message via a mock JMS template and wait for the specified timeout to receive the message on the expected channel * @param obj The message provided to the poller (currently must be a String) * @param expectedOutputChannel The expected output channel * @param handler An instance of CountDownHandler to handle (verify) the output message * @param timeoutMillisec The timeout period. Note that this must allow at least enough time to process the entire flow. Only set if the default is * not long enough * @return true if the message was received on the expected channel * @throws JMSException * @throws InterruptedException */ protected boolean verifyJmsMessageOnOutputChannel(Object obj, SubscribableChannel expectedOutputChannel, CountDownHandler handler,int timeoutMillisec) throws JMSException, InterruptedException { if (!(obj instanceof String)) { throw new IllegalArgumentException("Only TextMessage is currently supported"); } /* * Use mocks to create a message returned to the JMS inbound adapter. It is assumed that the JmsTemplate * is also a mock. */ TextMessage message = mock(TextMessage.class); doReturn(new SimpleMessageConverter()).when(jmsTemplate).getMessageConverter(); doReturn(message).when(jmsTemplate).receiveSelected(anyString()); String text = (String) obj; CountDownLatch latch = new CountDownLatch(1); handler.setLatch(latch); doReturn(text).when(message).getText(); expectedOutputChannel.subscribe(handler); boolean latchCountedToZero = latch.await(timeoutMillisec, TimeUnit.MILLISECONDS); if (!latchCountedToZero) { LOGGER.warn(String.format("The specified waiting time of the latch (%s ms) elapsed.", timeoutMillisec)); } return latchCountedToZero; } /* * A MessageHandler that uses a CountDownLatch to synchronize with the calling thread */ private abstract class CountDownHandler implements MessageHandler { CountDownLatch latch; public final void setLatch(CountDownLatch latch){ this.latch = latch; } protected abstract void verifyMessage(Message<?> message); /* * (non-Javadoc) * * @see * org.springframework.integration.core.MessageHandler#handleMessage * (org.springframework.integration.Message) */ public void handleMessage(Message<?> message) throws MessagingException { verifyMessage(message); latch.countDown(); } } But I get the following exception:
[0;33mWARN [main] [InboundChannelFlowUnitTest] The specified waiting time of the latch (2000 ms) elapsed. [m java.lang.AssertionError: message not sent to expected output channel Any hint on that?
EDIT:
I added the following test:
@SuppressWarnings("unchecked") @Test public void testMessageDriven() throws Exception { TextMessage message = mock(TextMessage.class); when(message.getText()).thenReturn("foo"); Session session = mock(Session.class); ((SessionAwareMessageListener<TextMessage>) this.messageListenerContainer.getMessageListener()).onMessage(message, session); CountDownHandler myCountDownHandler = new CountDownHandler() { @Override protected void verifyMessage(Message<?> message) { assertNotNull(message); assertEquals("hello", message.getPayload()); } }; CountDownLatch myLatch = new CountDownLatch(2); myCountDownHandler.setLatch(myLatch); this.inbound.subscribe(myCountDownHandler); boolean receivedBeforeZero = myLatch.await(3, TimeUnit.SECONDS); assertTrue(receivedBeforeZero); } And changed the message-driven adapter to:
<int-jms:message-driven-channel-adapter id="jmsIn" channel="inbound" container="messageListenerContainer" acknowledge="transacted"/> But still get the following error:
[0;33mWARN [main] [InboundChannelFlowUnitTest] The specified waiting time of the latch (3 sec) elapsed. [m java.lang.AssertionError at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.assertTrue(Assert.java:43) at org.junit.Assert.assertTrue(Assert.java:54)