Hot questions for Spring components for AMQP

Top 10 Java Open Source / Spring / Spring components for AMQP

Question:

My container XML config:

<rabbit:listener-container
        connection-factory="myConnectionFactory"
        acknowledge="none"
        concurrency="10"
        requeue-rejected="false">
    <rabbit:listener ref="myListener" queues="myQueue"/>
</rabbit:listener-container>

and myListener is just a class

@Component("myListener")
public class MyListener implements MessageListener {
    @Autowired
    SomeDependency dependency;
    ....
}

I've specified concurrency="10" in my XML. What does this mean exactly?


I've found some docs. They are not that helpful stating:

Specify the number of concurrent consumers to create. Default is 1.


What I am interested in is whether MyListener has to be thread safe i.e.

  • are there many instances created or single instance used by many threads?
  • can I access instance fields w/o synchronization?
  • is SomeDependency dependency instantiated once or for each thread/instance?
  • does dependency need to be thread safe?

Answer:

Yes, to use concurrency, your listener has to be thread-safe. There is one listener instance per container. However, the <rabbit:listener-container/> namespace element is actually just a convenience for adding "shared" attributes, each listener element gets its own container.

It's generally best to use stateless objects (no fields that are written to), but that's not always possible.

If your listener is not thread-safe, you can use...

<rabbit:listener-container
    connection-factory="myConnectionFactory"
    acknowledge="none"
    requeue-rejected="false">
    <rabbit:listener ref="myListener" queues="myQueue"/>
    <rabbit:listener ref="myListener" queues="myQueue"/>
    <rabbit:listener ref="myListener" queues="myQueue"/>
    <rabbit:listener ref="myListener" queues="myQueue"/>
    ...
</rabbit:listener-container>

...and add @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE). You will then get a container for each listener and a different instance of the listener will be injected into each.

You will also need prototype scope for any non-thread-safe dependencies injected into the listener.

Question:

I'm trying to create a simple spring boot app with spring boot that "produce" messages to a rabbitmq exchange/queue and another sample spring boot app that "consume" these messages. So I have two apps (or microservices if you wish). 1) "producer" microservice 2) "consumer" microservice

The "producer" has 2 domain objects. Foo and Bar which should be converted to json and send to rabbitmq. The "consumer" should receive and convert the json message into a domain Foo and Bar respectively. For some reason I can not make this simple task. There are not much examples about this. For the message converter I want to use org.springframework.messaging.converter.MappingJackson2MessageConverter

Here is what I have so far:

PRODUCER MICROSERVICE

package demo.producer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    Queue queue() {
        return new Queue("queue", false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("queue");
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... args) throws Exception {
        sender.sendToRabbitmq(new Foo(), new Bar());
    }
}

@Service
class Sender {

    @Autowired
    private RabbitMessagingTemplate rabbitMessagingTemplate;
    @Autowired
    private MappingJackson2MessageConverter mappingJackson2MessageConverter;

    public void sendToRabbitmq(final Foo foo, final Bar bar) {

        this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter);

        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo);
        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar);

    }
}

class Bar {
    public int age = 33;
}

class Foo {
    public String name = "gustavo";
}

CONSUMER MICROSERVICE

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Autowired
    private Receiver receiver;

    @Override
    public void run(String... args) throws Exception {

    }

}

@Service
class Receiver {
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) {
        System.out.println("Received <" + foo.name + ">");
    }

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) {
        System.out.println("Received <" + bar.age + ">");
    }
}

class Foo {
    public String name;
}

class Bar {
    public int age;
}

And here is the exception I'm getting:

    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)]
Bean [demo.consumer.Receiver@1672fe87]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
    ... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers={amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113)
    ... 12 common frames omitted

The exception says there is no converter, and that is true, my problem is that I have no idea how to set the MappingJackson2MessageConverter converter in the consumer side (please note that I want to use org.springframework.messaging.converter.MappingJackson2MessageConverter and not org.springframework.amqp.support.converter.JsonMessageConverter)

Any thoughts ?

Just in case, you can fork this sample project at: https://github.com/gustavoorsi/rabbitmq-consumer-receiver


Answer:

Ok, I finally got this working.

Spring uses a PayloadArgumentResolver to extract, convert and set the converted message to the method parameter annotated with @RabbitListener. Somehow we need to set the mappingJackson2MessageConverter into this object.

So, in the CONSUMER app, we need to implement RabbitListenerConfigurer. By overriding configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) we can set a custom DefaultMessageHandlerMethodFactory, to this factory we set the message converter, and the factory will create our PayloadArgumentResolver with the the correct convert.

Here is a snippet of the code, I've also updated the git project.

ConsumerApplication.java

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements RabbitListenerConfigurer {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    @Autowired
    private Receiver receiver;

}

@Service
class Receiver {
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) {
        System.out.println("Received <" + foo.name + ">");
    }

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) {
        System.out.println("Received <" + bar.age + ">");
    }
}

class Foo {
    public String name;
}

class Bar {
    public int age;
}

So, if you run the Producer microservice it will add 2 messages in the queue. One that represent a Foo object and another that represent a Bar object. By running the consumer microservice you will see that both are consumed by the respective method in the Receiver class.


Updated issue:

There is a conceptual problem about queuing from my side I think. What I wanted to achieved can not be possible by declaring 2 methods annotated with @RabbitListener that points to the same queue. The solution above was not working properly. If you send to rabbitmq, let say, 6 Foo messages and 3 Bar messages, they wont be received 6 times by the listener with Foo parameter. It seems that the listener are invoked in parallel so there is no way to discriminate which listener to invoke based on the method argument type. My solution (and I'm not sure if this is the best way, I'm open to suggestions here) is to create a queue for each entity. So now, I have queue.bar and queue.foo, and update @RabbitListener(queues = "queue.foo") Once again, I've updated the code and you can check it out in my git repository.

Question:

I am using Spring amqp 1.1 version as my java client. I have a queue which has around 2000 messages. I want to have a service which checks this queue size and and if it is empty it will send out a message saying " All items processed".

I dont know how to get current queue size ? Please help

I googled and found a class "RabbitBrokerAdmin" that was present in earlier version 1.0. I think it is not present in 1.1 now.

Any pointers in getting current queue size?


Answer:

So I know this is a little late and a solution has already been found but here is another way to look message counts in your queues

This solution assumes that you are using the spring rabbitmq framework and have defined your queues in your application config with the following tags defined

<rabbit:queue>
<rabbit:admin>

The java class:

public class QueueStatsProcessor {
    @Autowired
    private RabbitAdmin admin;
    @Autowired
    private List<Queue> rabbitQueues;

    public void getCounts(){
        Properties props;
        Integer messageCount;
        for(Queue queue : rabbitQueues){
            props = admin.getQueueProperties(queue.getName());
            messageCount = Integer.parseInt(props.get("QUEUE_MESSAGE_COUNT").toString());
            System.out.println(queue.getName() + " has " + messageCount + " messages");
        }
    }
}

You can also use this solution to read the current consumers attached to the queue http://docs.spring.io/spring-amqp/docs/1.2.1.RELEASE/api/org/springframework/amqp/rabbit/core/RabbitAdmin.html#getQueueProperties(java.lang.String)

Question:

I know it is possible to make SimpleMessageListenerContainer bean and set prefetch count and message listener here, like this:

@Bean
public SimpleMessageListenerContainer messageListenerContainer(
        ConnectionFactory rabbitConnectionFactory,
        Receiver receiver) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory);
    container.setQueueNames("hello");
    container.setMessageListener(new MessageListenerAdapter(receiver, "receive"));
    container.setPrefetchCount(1000);
    return container;
}

But how to set prefetch count for channel if I want to use declarative approach using @RabbitListener?

@Component
public class Receiver {

    private static final Logger log = LoggerFactory.getLogger(Receiver.class);

    @RabbitListener(queues = "hello") // how to set prefetch count here?
    public void receive(String message) {
        log.info(" [x] Received '{}'.", message);
    }

}

It is not possible?


Answer:

Solution according to @artem-bilan answer:

Declare RabbitListenerContainerFactory bean with prefetch count 10 in some @Configuration class:

@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory);
    factory.setPrefetchCount(10);
    return factory;
}

Receiver bean uses this factory bean:

@Component
public class Receiver {

    private static final Logger log = LoggerFactory.getLogger(Receiver.class);

    @RabbitListener(queues = "hello", containerFactory = "prefetchTenRabbitListenerContainerFactory")
    public void receive(String message) {
        log.info(" [x] Received '{}'.", message);
    }

    @RabbitListener(queues = "hello")
    public void receiveWithoutPrefetch(String message) {
        log.info(" [x] Received without prefetch '{}'.", message);
    }

}

Two listeners here is just for demo purpose. With this configuration Spring creates two AMQP channels. One for each @RabbitListener. First with prefetch count 10 using our new prefetchTenRabbitListenerContainerFactory bean and second with prefetch count 1 using default rabbitListenerContainerFactory bean.

Question:

I store thread-local rabbit message data in an MDC. I would like to clear the old and add new context data for an incoming rabbit message, like reading certain values from the headers or reading the rabbit message payload as a byte[]. Unfortunately I often see exceptions happening prior to the message hitting my @RabbitHandler annotated methods. Is there an earlier entry-point that I can hook into to establish this context? I don't know what happens before deserialization occurs, but ideally I'd like access to the message before attempting to deserialize it. Perhaps there's an onMessageReceived(byte[] message, Map headers) method hook somewhere. The earlier in the call stack the better.


Answer:

The @RabbitHandler is populated by the AbstractRabbitListenerContainerFactory which can be supplied with the custom MessageConverter: https://docs.spring.io/spring-amqp/docs/2.0.1.RELEASE/reference/html/_reference.html#message-converters. Its fromMessage() is called from the MessagingMessageListenerAdapter.toMessagingMessage(). And that is done in the MessagingMessageListenerAdapter.onMessage(). That's indeed very early place you can hook. And you really there still have a raw org.springframework.amqp.core.Message object without any conversion and with all available headers and properties.

Well, you also can inject:

/**
 * @param afterReceivePostProcessors the post processors.
 * @see AbstractMessageListenerContainer#setAfterReceivePostProcessors(MessagePostProcessor...)
 */
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {

With similar reason you are requesting.