Hot questions for Spring JMS

Question:

When I use Spring to listen to JMS messages, I receievd the above error.

I am wondering how to add an Errorhandler into the JMS listener?


Answer:

There is a property on AbstractMessageListenerContainer:

<bean id="listener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="errorHandler" ref="someHandler"/>
    <property name="destinationName" value="someQueue"/>
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

Where someHandler is a bean implementing ErrorHandler:

@Service
public class SomeHandler implements ErrorHandler {

    @Override
    public void handleError(Throwable t) {
        log.error("Error in listener", t);
    }
}

However note that according to the documentation:

The default behavior of this message listener [...] will log any such exception at the error level. [...] However, if error handling is necessary, then any implementation of the ErrorHandler strategy may be provided to the setErrorHandler(ErrorHandler) method.

Check out your logs, maybe the exception is already logged?

Question:

In my spring boot application i configure two different instances of MQQueueConnectionFactory (different id) as it is a need of the application. For that i have added ibm client jars.

I have also added spring-jms dependency in my code as i wanted JmsTemplate etc classes. After adding this dependency, JmsAutoConfiguration finds JmsTemplate in classpath and tries to configure beans. In this process, it tries to inject bean of type ConnectionFactory and this is where the code fails and i start getting the error. Below is the code from JmsAutoConfiguration

@Configuration
@ConditionalOnClass(JmsTemplate.class)
@ConditionalOnBean(ConnectionFactory.class)
@EnableConfigurationProperties(JmsProperties.class)
@Import(JmsAnnotationDrivenConfiguration.class)
public class JmsAutoConfiguration {

    @Autowired
    private JmsProperties properties;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired(required = false)
    private DestinationResolver destinationResolver;

Do i have a facility to switch off JmsAutoconfiguration feature of spring boot by any chance? If not then what is the alternative solution for this?


Answer:

You can add the auto configurations, which you want to disable, to the SpringBootApplication annotation:

@SpringBootApplication(exclude = JmsAutoConfiguration.class)

Question:

I am trying to create example for publish-subscribe based on @JmsListener annotation: https://github.com/lkrnac/book-eiws-code-samples/tree/master/05-jms/0515-publish-subscribe

Relevant code snippets:

@Slf4j
@SpringBootApplication
@EnableScheduling
public class JmsPublishSubscribeApplication {

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(JmsPublishSubscribeApplication.class, args);
    }

    @Bean
    public ActiveMQTopic simpleTopic() {
        return new ActiveMQTopic("simpleTopic");
    }

}

@Component
public class SimpleMessageListener1 {

    @JmsListener(destination = "simpleTopic")
    public void readMessage(String message) {
      //....
    }

}

@Component
public class SimpleMessageListener2 {

    @JmsListener(destination = "simpleTopic")
    public void readMessage(String message) {
      //....
    }

}

The problem is that is get this behaviour:

2015-05-17 20:07:04.985  INFO 22983 --- [pool-1-thread-1] n.l.b.e.chapter05.SimpleMessageSender    : Sending message: simple message
2015-05-17 20:07:05.070  INFO 22983 --- [enerContainer-1] n.l.b.e.c.JmsPublishSubscribeApplication : Message Received: simple message via listener 2
2015-05-17 20:07:05.975  INFO 22983 --- [pool-1-thread-1] n.l.b.e.chapter05.SimpleMessageSender    : Sending message: simple message
2015-05-17 20:07:05.986  INFO 22983 --- [enerContainer-1] n.l.b.e.c.JmsPublishSubscribeApplication : Message Received: simple message via listener 1
2015-05-17 20:07:06.975  INFO 22983 --- [pool-1-thread-1] n.l.b.e.chapter05.SimpleMessageSender    : Sending message: simple message
2015-05-17 20:07:06.987  INFO 22983 --- [enerContainer-1] n.l.b.e.c.JmsPublishSubscribeApplication : Message Received: simple message via listener 2
2015-05-17 20:07:07.975  INFO 22983 --- [pool-1-thread-1] n.l.b.e.chapter05.SimpleMessageSender    : Sending message: simple message
2015-05-17 20:07:07.994  INFO 22983 --- [enerContainer-1] n.l.b.e.c.JmsPublishSubscribeApplication : Message Received: simple message via listener 1

But each message should be consumed by both listeners by definition of topics. What am I missing?


Answer:

When using a @JmsListener it uses a DefaultMessageListenerContainer which extends JmsDestinationAccessor which by default has the pubSubDomain set to false. When this property is false it is operating on a queue. If you want to use topics you have to set this properties value to true.

As you are using Spring Boot you can quite easily set this property to true by adding the spring.jms.pub-sub-domain property to the application.properties and set it to true.

spring.jms.pub-sub-domain=true

When using a @JmsListener it is looking for a jmsListenerContainerFactory named bean, if that isn't available a default one is expected. You can also include your own bean and programmatically set this property yo true.

@Bean
public DefaultMessageListenerContainer jmsListenerContainerFactory() {
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
    dmlc.setPubSubDomain(true);
    // Other configuration here
    return dmlc;
}

This would of course also work but would be more work, more information on this can be found in the documentation of the @EnableJms annotation.

Question:

How do I set the redeliveryPolicy in ActiveMQ on a Queue?

1) In the doc, see: activeMQ Redelivery, the explain that you should set it on the ConnectionFactory or Connection. But I want to use different value's for different Queue's.

2) Apart from that, I don't seem to get it work. Setting it on the connection factory in Spring (I am using activemq 5.4.2. with Spring 3.0) like this don't seem to have any effect:

<amq:connectionFactory id="amqConnectionFactory" brokerURL="${jms.factory.url}" >
    <amq:properties>
        <amq:redeliveryPolicy maximumRedeliveries="6" initialRedeliveryDelay="15000" useExponentialBackOff="true" backOffMultiplier="5"/>
    </amq:properties>
</amq:connectionFactory>

I also tried to set it as property on the defined Queue, but that also seem to be ignored as the redelivery occurs sooner that the defined values:

<amq:queue id="jmsQueueDeclarationSnd"  physicalName="${jms.queue.declaration.snd}" >
    <amq:properties>
        <amq:redeliveryPolicy maximumRedeliveries="6" initialRedeliveryDelay="15000" useExponentialBackOff="true" backOffMultiplier="5"/>
    </amq:properties>
</amq:queue>

Thanks


Answer:

I too was using the method shown by Ivan above for amq:connectionFactory

Whilst upgrading to ActiveMQ 5.7.0 I noticed this no longer works (since the implementation of https://issues.apache.org/jira/browse/AMQ-3224). Anyway after reading a better post on the ActiveMQ forums I currently use :-

<amq:queue id="emailQueue" physicalName="emailQueue" />
<amq:queue id="smsQueue" physicalName="smsQueue" />

<!-- Wait 15 seconds first re-delivery, then 45, 135, 405, 1215, 3645 seconds -->
<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="backOffMultiplier" value="3" />
    <property name="initialRedeliveryDelay" value="15000" />
    <property name="maximumRedeliveries" value="6" />
    <property name="queue" value="*" />
    <property name="redeliveryDelay" value="15000" />
    <property name="useExponentialBackOff" value="true" />
</bean>

<amq:connectionFactory id="jmsFactory" brokerURL="yourProtocol/BrokerURL">
    <property name="redeliveryPolicy" ref="redeliveryPolicy" />
</amq:connectionFactory>

Note that for any messages that fail to be redelivered after 6 retries, ActiveMQ will create a DLQ.emailQueue' or DLQ.smsQueue and enqueue the message on that queue (dequeuing it from the original queue).

Question:

I would like to use the new annotations and features provided in Spring 4.1 for an application that needs a JMS listener.

I've carefully read the notes in the Spring 4.1 JMS improvements post but I continue to miss the relationship between @JmsListener and maybe the DestinationResolver and how I would setup the application to indicate the proper Destination or Endpoint.

Here is the suggested use of @JmsListener

@Component
public class MyService {

    @JmsListener(containerFactory = "myContainerFactory", destination = "myQueue")
    public void processOrder(String data) { ... }
}

Now, I can't use this in my actual code because the "myQueue" needs to be read from a configuration file using Environment.getProperty().

I can setup an appropriate myContainerFactory with a DestinationResolver but mostly, it seems you would just use DynamicDestinationResolver if you don't need JNDI to lookup a queue in an app server and didn't need to do some custom reply logic. I'm simply trying to understand how Spring wants me to indicate the name of the queue in a parameterized fashion using the @JmsListener annotation.

Further down the blog post, I find a reference to this Configurer:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
    registrar.setDefaultContainerFactory(defaultContainerFactory());

    SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
    endpoint.setDestination("anotherQueue");
    endpoint.setMessageListener(message -> {
        // processing
    });
    registrar.registerEndpoint(endpoint);
}

Now, this makes some amount of sense and I could see where this would allow me to set a Destination at runtime from some external string, but this seems to be in conflict with using @JmsListener as it appears to be overriding the annotation in favor of endpoint.setMessageListener in the code above.

Any tips on how to specify the appropriate queue name using @JmsListener?


Answer:

Also note that depending on use case you can already parameterize using properties file per environment and PropertySourcesPlaceholderConfigurer

@JmsListener(destinations = "${some.key}")

As per https://jira.spring.io/browse/SPR-12289

Question:

I have requirement to move messages from queues on one ActiveMQ instance to another ActiveMQ instance. Is there a way to connect to two different ActiveMQ instances using spring boot configuration?

Do I need to create multiple connectionFactories? If so then how does JmsTemplate know which ActiveMQ instance to connect to?

  @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(JMS_BROKER_URL);
    }

Any help and code examples would be useful.

Thanks in advance. GM


Answer:

Additionally to the response of @Chris You have to create different BrokerService instances using differents ports and create different ConnectionFactory to connect to each broker and create different JmsTemplate using these different factories to send messages to differents brokers.

For example :

import javax.jms.ConnectionFactory;
import javax.jms.QueueConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class ActiveMQConfigurationForJmsCamelRouteConsumeAndForward {
    public static final String LOCAL_Q = "localQ";
    public static final String REMOTE_Q = "remoteQ";

    @Bean
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:5671");
        broker.setBrokerName("broker");
        broker.setUseJmx(false);
        return broker;
    }

    @Bean
    public BrokerService broker2() throws Exception {
        final BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:5672");
        broker.setBrokerName("broker2");
        broker.setUseJmx(false);
        return broker;
    }

    @Bean
    @Primary
    public ConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5671");
        return connectionFactory;
    }

    @Bean
    public QueueConnectionFactory jmsConnectionFactory2() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5672");
        return connectionFactory;
    }

    @Bean
    @Primary
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(jmsConnectionFactory());
        jmsTemplate.setDefaultDestinationName(LOCAL_Q);
        return jmsTemplate;
    }

    @Bean
    public JmsTemplate jmsTemplate2() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(jmsConnectionFactory2());
        jmsTemplate.setDefaultDestinationName(REMOTE_Q);
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory2(
            @Qualifier("jmsConnectionFactory2") ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

To move messages from one AMQ instance to another instance you can use JmsBridgeConnectors :

Note that by the example below you cannot have multiple consumers on the queue from which you want to forward the messages because Camel or JmsBridgeConnectors consume the message and forward it. If you want a only copy of the message to be forwarded you have some solutions : 1- Convert your queue to a topic, manage the messages for offline consumers by a durable subscriptions or retroactive consumers. 2- convert your queue to a composite queue and use DestinationsInterceptors to copy messages to another queue. 3- use NetworkConnector for Networkof brokers

@Bean
public BrokerService broker() throws Exception {
    final BrokerService broker = new BrokerService();
    broker.addConnector("tcp://localhost:5671");
    SimpleJmsQueueConnector simpleJmsQueueConnector = new SimpleJmsQueueConnector();
    OutboundQueueBridge bridge = new OutboundQueueBridge();
    bridge.setLocalQueueName(LOCAL_Q);
    bridge.setOutboundQueueName(REMOTE_Q);
    OutboundQueueBridge[] outboundQueueBridges = new OutboundQueueBridge[] { bridge };
    simpleJmsQueueConnector.getReconnectionPolicy().setMaxSendRetries(ReconnectionPolicy.INFINITE);
    simpleJmsQueueConnector.setOutboundQueueBridges(outboundQueueBridges);
    simpleJmsQueueConnector.setLocalQueueConnectionFactory((QueueConnectionFactory) jmsConnectionFactory());
    simpleJmsQueueConnector.setOutboundQueueConnectionFactory(jmsConnectionFactory2());
    JmsConnector[] jmsConnectors = new JmsConnector[] { simpleJmsQueueConnector };
    broker.setJmsBridgeConnectors(jmsConnectors);
    broker.setBrokerName("broker");
    broker.setUseJmx(false);
    return broker;
}

Or by using Camel like this below :

@Bean
public CamelContext camelContext() throws Exception {
    CamelContext context = new DefaultCamelContext();
    context.addComponent("inboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5671"));
    context.addComponent("outboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5672"));
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("inboundQueue:queue:" + LOCAL_Q).to("outboundQueue:queue:" + REMOTE_Q);
        }
    });
    context.start();
    return context;
}

your Producer must be like this to use differents JmsTemplates :

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class Producer implements CommandLineRunner {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("jmsTemplate2")
    private JmsTemplate jmsTemplate2;

    @Override
    public void run(String... args) throws Exception {
        send("Sample message");
    }

    public void send(String msg) {
        this.jmsTemplate.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.LOCAL_Q, msg);
        this.jmsTemplate2.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, msg);
    }
}

and Consumer :

import javax.jms.Session;

import org.apache.activemq.ActiveMQSession;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, containerFactory = "jmsListenerContainerFactory2")
    public void receiveQueue(Session session, String text) {
        System.out.println(((ActiveMQSession) session).getConnection().getBrokerInfo());
        System.out.println(text);
    }
}

Question:

This is quite a common problem and I have found plenty solutions that do not work for me on the web. I am declaring:

<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
  <version>3.0.7.RELEASE</version>
</dependency>

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.5.0</version>
</dependency>

And jetty plug-in

<plugin>
  <groupId>org.mortbay.jetty</groupId>
  <artifactId>jetty-maven-plugin</artifactId>
</plugin>

But I keep on getting the following exception when issuing "mvn jetty:run":

org.springframework.beans.FatalBeanException: NamespaceHandler class [org.apache.xbean.spring.context.v2.XBeanNamespaceHandler] for namespace [http://activemq.apache.org/schema/core] not found; nested exception is java.lang.ClassNotFoundException: org.apache.xbean.spring.context.v2.XBeanNamespaceHandler

What would be the correct maven dependencies to run Jetty, ActiveMQ and Spring JMS?


Answer:

You need dependency on xbean-spring

<dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>3.7</version>
</dependency>

Question:

I am using the Spring API's JmsTemplate and MappingJackson2MessageConverter (version: spring-jms-4.3.4.RELEASE.jar) to publish messages to an ActiveMQ topic as shown in the below code.

TopicPublisher class:

@Component
public class TopicPublisher {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private MessageConverter messageConverter;

    public void send() {
        Product product = new Product();
        product.setName("abcd");
        product.setPrice(10);

        jmsTemplate.setMessageConverter(messageConverter);
        jmsTemplate.convertAndSend("product.topic", product);
    }
}

MappingJackson2MessageConverter class:

@Configuration
public class JMSTextMessageConverter {

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter mappingJackson2MessageConverter 
             = new MappingJackson2MessageConverter();
        mappingJackson2MessageConverter.setTargetType(MessageType.TEXT);
        mappingJackson2MessageConverter.setTypeIdPropertyName("_type");
        return mappingJackson2MessageConverter;
    }   
}

Now, I want to set few custom headers to the JMS message being published to the topic. I googled and could not find any example which does this. Can you help ?


Answer:

You can add custom properties by using convertAndSendmethod from JmsTemplate by sending MessagePostProcessor as shown below:

jmsTemplate.convertAndSend("product.topic", product, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws JMSException {
            message.setStringProperty("my_property", "my_value");
            return message;
        }
    });

Question:

I am using a @JmsListener annotated method listen to JMS messages as shown below.

@JmsListener(destination="exampleQueue")  
public void fetch(@Payload String message){  
    process(message);  
}

When this method execution result in an exception, I got a warn log

Execution of JMS message listener failed, and no ErrorHandler has been set.

How do I set an ErrorHandler to handle the case. I am using spring boot 1.3.3.RELEASE


Answer:

While using annotations like @EnableJms, @JmsListener etc to work with Spring JMS, the ErrorHandler can be set like this

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, ExampleErrorHandler errorHandler) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setErrorHandler(errorHandler);
    return factory;
}

@Service
public class ExampleErrorHandler implements ErrorHandler{   
    @Override
    public void handleError(Throwable t) {
        //handle exception here
    }
}

More details are available here : Annotation-driven listener endpoints