0

With this code:

import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class CompositeQueuePriority { public static void main(String[] args) throws Exception { String brokerUrl = "tcp://localhost:61616"; BrokerService broker = new BrokerService(); broker.addConnector(brokerUrl); broker.setPersistent(false); broker.setDestinationPolicy(policyMap()); broker.start(); Destination a = ActiveMQDestination.createDestination("queue", ActiveMQDestination.QUEUE_TYPE); Session session = createSession(); MessageProducer lowProducer = session.createProducer(a); lowProducer.setPriority(1); MessageProducer highProducer = session.createProducer(a); highProducer.setPriority(9); MessageConsumer consumer = session.createConsumer(a); for (int i = 0; i < 10; i++) { lowProducer.send(session.createTextMessage("Low")); highProducer.send(session.createTextMessage("High")); String first = ((TextMessage) consumer.receive()).getText(); String second = ((TextMessage) consumer.receive()).getText(); System.out.println(first + ", " + second); } broker.stop(); } private static Session createSession() throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); Connection connection = connectionFactory.createConnection(); connection.start(); return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } private static PolicyMap policyMap() { PolicyMap policyMap = new PolicyMap(); policyMap.setDefaultEntry(prioPolicyEntry()); return policyMap; } private static PolicyEntry prioPolicyEntry() { PolicyEntry policyEntry = new PolicyEntry(); policyEntry.setPrioritizedMessages(true); return policyEntry; } } 

The output is:

Low, High Low, High Low, High Low, High Low, High Low, High Low, High Low, High Low, High Low, High 

According to the tdocumentation, priority is supported since 5.4, I'm using 5.15. Am I doing something wrong?

1 Answer 1

2

I believe your problem is that the consumer has already been created when you send the messages which means that the messages will be dispatched immediately to the consumer once the broker receives them and therefore the messages won't have a chance to be resorted by priority.

Send all the messages first and then create your consumer.

Sign up to request clarification or add additional context in comments.

1 Comment

One way to mitigate some of the prefetching issue is to configure the client using "jms.messagePrioritySupported=true" on the client URI to enable the client reordering of the prefecth buffer. Won't fix everything but can help although at the cost of some performance

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.