@RabbitListener for the same queue in multiple classes

I was wondering if it is possible in Spring AMQP to receive messages from the same queue in multiple classes depending on the payload type.

I am aware of using the @RabbitListener annotation in class and then putting @RabbitHandler on methods, but I would like to split complexity of message handling in multiple classes while keeping a single queue.

Version currently in use: Spring AMQP v2.0.3 along with RabbitMQ.

Well, it isn't possible. The way you would like it won't be a queue then. That is really an architecture decision to design a single listener and distribute to its methods according the payload type.

As a workaround I can suggest you to delegate the logic from a single @RabbitListener class to those business services:

    @RabbitListener(queues = "foo")
    public class MyListener {

        private final ServiceA serviceA;

        private final ServiceB serviceB;

        public MyListener(ServiceA serviceA, ServiceB serviceB) {
              this.serviceA = serviceA;
              this.serviceB = serviceB;
        }


        @RabbitHandler
        public void handleA(A a) {
             this.serviceA.handle(a);
        }

        @RabbitHandler
        public void handleB(B b) {
             this.serviceB.handle(b);
        }
    }

Multi-consumers binding different parameters · Issue #879 · spring , RELEASE when a message is routed to Multi-consumers, a consumer needs to deserialize is class,which contain timestamp, query and other fields; @​Service @Slf4j @RabbitListener(queues = "time") public class  But how to use it, i don't know if it can be used on a class or a method? Does it exist any example? @RabbitListener for the same queue in multiple classes.

Yes you can, but it takes a little different approach: You need lo listen to the generic Message Type, do some switching, and your own deserialisation. You can of course completly hide that code somewhere (baseclass, annotations...)

Example below can be extended to listen to whatever types extra. The example above would be to filter on A and B DTO types.

void receive(ADTO dto)
{
    System.out.print(dto.url);
}

void receive(BDTO dto)
{
    System.out.print(dto.url);
}
@RabbitListener(queues = "your.queue.name")
public void listenMesssage(Message message) 
 {
    try 
    {
        String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
        String contentType = message.getMessageProperties().getContentType();
        if (contentType != "application/json" || typeId == null || !typeId.contains(ADTO.class.toString()))
        {
            //TODO log warning
            System.out.print("type not supported by this service");
            return;
        }
        Object receivedObject = new Jackson2JsonMessageConverter().fromMessage(message);

        if (receivedObject instanceof ADTO)
        {
            receive((ADTO)receivedObject);
            System.out.print("ADTO");
        }
        //else

Alternatively, you can also do the serialisation like this:

....
String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
byte[] binMsg =  message.getBody();
String strMsg  = new String(binMsg, StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
if (typeId.contains("ADTO"))
{
   receive(mapper.readValue(strMsg, ADTO.class ));
}
else
...            

Spring AMQP, It has the same API so it's just a matter of changing the class name of any The Declarables object (for declaring multiple queues, exchanges,  I just ran a test (2 queues each with 2 existing messages) and they were delivered round-robin - q1m1, q2m1, q1m2, q2m2 when the prefetch was 1. With prefetch set to 4, I see q1m1, q1m2, q2m1, q2m2. Of course, when the queues are empty, messages will generally arrive in the order they arrive at the broker.

Alternatively, I found out, if you have a high enough version of AQMP and Rabbit (mine = spring-rabit 2.1.3, spring-starter-aqmp 2.1.2) you can do this polymorph:

@RabbitListener(id="multi", queues = "somequeuename")
public class SomeService 
{
    @RabbitHandler
    public void handleADTO(@Payload ADTO adto) {
         System.out.print(adto.url);
    }

    @RabbitHandler
    public void handleADTO2(@Payload ADTO2 adto) {
         System.out.print(adto.url);
    }
}

However, out of the box, this does not work. You need to configure another "bag of Beans":

@Bean
public SimpleRabbitListenerContainerFactory myFactory(
        SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory =
            new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    return factory;
}

@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    return template;
}

Just put it in your app, or some other class.

See also

https://docs.spring.io/spring-amqp/docs/current/reference/html/

[#AMQP-444] Multiple POJO message types on same channel , public class AsyncHandler { @RabbitListener(queues = "hello") public void handleMessageType1(MessageType1 msg) { } @RabbitListener(queues = "​hello")  The queues for this listener. The entries can be 'queue name', 'property-placeholder keys' or 'expressions'. Expression must be resolved to the queue name or Queue object. The queue(s) must exist, or be otherwise defined elsewhere as a bean(s) with a RabbitAdmin in the application context.

RabbitMQ tutorial, In case you use a different host, port or credentials, connections settings would We will follow the same pattern as in the first tutorial: 1) create a package tut2 and StopWatch; @RabbitListener(queues = "hello") public class Tut2Receiver  The following are top voted examples for showing how to use org.springframework.amqp.rabbit.annotation.RabbitListener. These examples are extracted from open source projects. You can vote up the examples you like and your votes will be used in our system to generate more good examples.

RabbitMQ tutorial, In case you use a different host, port or credentials, connections settings would require adjusting. Topic exchange is powerful and can behave like other exchanges. StopWatch; public class Tut5Receiver { @RabbitListener(queues  First queue (GENERIC) is receiving the message as a generic Message class, so there is no implicit conversion. Spoiler alert: when we log the message we should see something like a JSON string in the payload since it has not been converted. On the other hand, the queue SPECIFIC is expecting a CustomMessage class (the method argument). That will trigger a logic inside Spring to find a converter from JSON to that specific class.

How can I use @RabbitListener in multiple classes?, So: Is it possible to define queues in multiple classes with @RabbitListener? the same class or in multiple classes, as long as those classes are used in Spring​  On this page we will integrate Spring 4 and RabbitMQ using annotation. We need to install RabbitMQ AMQP server for the integration. RabbitMQ is an open source messaging service.

Comments
  • This is what I was already doing, just needed to verify there is no other way. I think the issue in Spring AMQP is that you cannot have multiple consumers on the same queue.
  • You can, but then they are going to be competing consumers and there won't be a way to control messages distribution for their payload type. You need to consume message, convert its byte[] into a desired type, and only after that your will be able to route the result to the appropriate method to process. But that is already out side of the AMQP protocol and that already will be an issue do not follow the protocol in the Spring AMQP. You can consider to use different queues for different payloads though and route them on the Broker level vial routing key.