Hot questions for Messaging Using Spring AMQP

Top 10 Java Open Source / Spring / Messaging Using Spring AMQP

Question:

I'm currently working on a rabbit-amqp implementation project and use spring-rabbit to programmatically setup all my queues, bindings and exchanges. (spring-rabbit-1.3.4 and spring-framework versions 3.2.0)

The declaration in a javaconfiguration class or xml-based configuration are both quite static in my opinion declared. I know how to set a more dynamic value (ex. a name) for a queue, exchange or binding like this:

@Configuration
public class serverConfiguration {
   private String queueName;
   ...
   @Bean
   public Queue buildQueue() {
    Queue queue = new Queue(this.queueName, false, false, true, getQueueArguments());
    buildRabbitAdmin().declareQueue(queue);
    return queue;
   }
   ...
}

But I was wondering if it was possible to create a undefined amount instances of Queue and register them as beans like a factory registering all its instances.

I'm not really familiar with the Spring @Bean annotation and its limitations, but I tried

@Configuration
public class serverConfiguration {
   private String queueName;
   ...
   @Bean
   @Scope("prototype")
   public Queue buildQueue() {
    Queue queue = new Queue(this.queueName, false, false, true, getQueueArguments());
    buildRabbitAdmin().declareQueue(queue);
    return queue;
   }
   ...
}

And to see if the multiple beans instances of Queue are registered I call:

Map<String, Queue> queueBeans = ((ListableBeanFactory) applicationContext).getBeansOfType(Queue.class);

But this will only return 1 mapping:

name of the method := the last created instance.

Is it possible to dynamically add beans during runtime to the SpringApplicationContext?


Answer:

You can add beans dynamically to the context:

context.getBeanFactory().registerSingleton("foo", new Queue("foo"));

but they won't be declared by the admin automatically; you will have to call admin.initialize() to force it to re-declare all the AMQP elements in the context.

You would not do either of these in @Beans, just normal runtime java code.

Question:

Seems since spring-amqp version 1.5, there is a new annotation @queuebinding。But how to use it, i don't know if it can be used on a class or a method? Does it exist any example?


Answer:

Not sure what problem you have, but here is a sample exactly from the Reference Manual:

@Component
public class MyService {

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch"),
        key = "orderRoutingKey")
  )
  public void processOrder(String data) {
    ...
  }

And yes, it can be use as on class level as well as on method level.

Question:

I am new to Spring AMQP. I am having an application which is a producer sending messages to the other application which is a consumer.

Once the consumer receives the message, we will do validation of the data.

If the data is proper we have to ACK and message should be removed from the Queue. If the data is improper we have to NACK(Negative Acknowledge) the data so that it will be re-queued in RabbitMQ.

I came across

**factory.setDefaultRequeueRejected(false);**( It will not requeue the message at all)

**factory.setDefaultRequeueRejected(true);**( It will requeue the message when exception occurs)

But my case i will acknowledge the message based on validation. Then it should remove the message. If NACK then requeue the message.

I have read in RabbitMQ website

The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them

How to achieve the above scenario? Please provide me some examples.

I tried a small Program

       logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));

Message is not re queuing for different exception factory.setDefaultRequeueRejected(true)

09:46:38,854 ERROR [stderr] (SimpleAsyncTaskExecutor-1) org.activiti.engine.ActivitiObjectNotFoundException: no processes deployed with key 'WF89012'

09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler] (SimpleAsyncTaskExecutor-1) Received from Error Queue: {ERROR=Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly}


Answer:

See the documentation.

By default, (with defaultRequeueRejected=true) the container will ack the message (causing it to be removed) if the listener exits normally or reject (and requeue) it if the listener throws an exception.

If the listener (or error handler) throws an AmqpRejectAndDontRequeueException, the default behavior is overridden and the message is discarded (or routed to a DLX/DLQ if so configured) - the container calls basicReject(false) instead of basicReject(true).

So, if your validation fails, throw an AmqpRejectAndDontRequeueException. Or, configure your listener with a custom error handler to convert your exception to an AmqpRejectAndDontRequeueException.

That is described in this answer.

If you really want to take responsibility for acking yourself, set the acknowledge mode to MANUAL and use a ChannelAwareMessageListener or this technique if you are using a @RabbitListener.

But most people just let the container take care of things (once they understand what's going on). Generally, using manual acks is for special use cases, such as deferring acks, or early acking.

EDIT

There was a mistake in the answer I pointed you to (now fixed); you have to look at the cause of the ListenerExecutionFailedException. I just tested this and it works as expected...

@SpringBootApplication
public class So39530787Application {

    private static final String QUEUE = "So39530787";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend(QUEUE, "foo");
        template.convertAndSend(QUEUE, "bar");
        template.convertAndSend(QUEUE, "baz");
        So39530787Application bean = context.getBean(So39530787Application.class);
        bean.latch.await(10, TimeUnit.SECONDS);
        System.out.println("Expect 1 foo:"  + bean.fooCount);
        System.out.println("Expect 3 bar:"  + bean.barCount);
        System.out.println("Expect 1 baz:"  + bean.bazCount);
        context.close();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(new ConditionalRejectingErrorHandler(
                t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException));
        return factory;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE, false, false, true);
    }
    private int fooCount;

    private int barCount;

    private int bazCount;

    private final CountDownLatch latch = new CountDownLatch(5);

    @RabbitListener(queues = QUEUE)
    public void handle(String in) throws Exception {
        System.out.println(in);
        latch.countDown();
        if ("foo".equals(in) && ++this.fooCount < 3) {
            throw new FooException();
        }
        else if ("bar".equals(in) && ++this.barCount < 3) {
            throw new BarException();
        }
        else if ("baz".equals(in)) {
            this.bazCount++;
        }
    }

    @SuppressWarnings("serial")
    public static class FooException extends Exception { }

    @SuppressWarnings("serial")
    public static class BarException extends Exception { }

}

Result:

Expect 1 foo:1
Expect 3 bar:3
Expect 1 baz:1

Question:

This is how I'm creating an exchange and binding a queue to it

<rabbit:topic-exchange id="dataExchange" name="MQ-EXCHANGE" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="COMM_QUEUE" pattern="queue.*" />
        </rabbit:bindings>
</rabbit:topic-exchange>

I have read a lot of posts on the Internet where it is written that a message is also needed to be marked persistent if it is to be secured in case rabbitmq or the queue crashes. But I couldn't figure out how to mark my messages persistent.

This is how I'm publishing the messages to the queue

    @Autowired
    private RabbitTemplate template;

    @Override
    public void produceMessage(Object message, String routingKey) {
        template.convertAndSend(routingKey, message);  
    }

I looked for different API methods to know this and also tried to look for any specific property that I could configure in the XML but couldn't find a way. Any guidance ?


Answer:

The default delivery mode (in MessageProperties) is PERSISTENT. See here.

To make it non-persistent you need to use a convertAndSend(...) method with a MessagePostProcessor to set the deliveryMode property.

Question:

After Google for a few days, and i believe i am totally lost. I would like to implement a kind of priority queue that has about 3 queues:

  1. high priority queue (daily), that needs to be process first.
  2. mid priority queue (weekly), that will process if no items in queue #1. (it is ok message in this queue it never process at all)
  3. low priority queue (monthly), that will process if no items in queue #1 & #2. (it is ok message in this queue it never process at all)

Initially I have the following flow, to have a consumer to consume messages from all three queues and checks whether there is any items in queue #1, #2 and #3. and then I realize that this is wrong because:

  1. I am totally lost with a question: "How do I know which queue it is coming from?".
  2. I'm already consuming a message regardless from any queue, So if I get an object from lower priority queue, am I gonna put it back to the queue if I discover there is a message at the higher priority queue?

Following is my current configurations, which shows what an idiot I am.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<rabbit:connection-factory id="connectionFactory" host="localhost" />

<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
    exchange="" routing-key="daily_queue"/>

<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
    exchange="" routing-key="weekly_queue"/>

<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
    exchange="" routing-key="monthly_queue"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>    

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>    

<bean id="Consumer" class="com.test.Consumer" />

</beans>

Any idea how should I tackle this with priority queue?

ps: I also wonder, if Apache Camel has something I can depend on?

UPDATE 1: I just saw this from Apache Camel: "https://issues.apache.org/jira/browse/CAMEL-2537" the sequencer on JMSPriority seems to be what im looking for, anyone has tried this before?

UPDATE 2: assuming i am to use RabbitMQ's plugin base on @Gary Russell recommendation, I have the following spring-rabbitmq context XML configuration, which seems to make sense (by guest..):

<rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
            <entry key="x-max-priority" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" />
</rabbit:listener-container>

<bean id="Consumer" class="com.test.Consumer" />

The above xml configuration has successfully create a Queue, with name: "ad_google_dfa_reporting_queue", and with Parameter arguments: x-max-priority: 10 & durable: true

But not when comes to the code that send the message with priority, I totally lost it. How to define the priority as mention in the Sample URL: https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java

AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting");
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority?

UPDATE 3: Based on the @Gary's answer, i manage to sent message with priority set in the message, as per image below: However, when i sent in 1000 messages with random priority between 1-10, the consumer is consuming message with all kinds of priority. (I was expecting only the high priority message to be consume first). following is the code for Message producer:

    Random random = new Random();
    for (int i=0; i< 1000; i++){
        final int priority = random.nextInt(10 - 1 + 1) + 1;

        DfaReportingModel model = new DfaReportingModel();
        model.setReportType(DfaReportingModel.ReportType.FACT);
        model.setUserProfileId(0l + priority);
        amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(priority);
                return message;
            }
        });
    }

And following is the code for Message consumer:

    public void consume(DfaReportingModel message) {
        System.out.println(message.getUserProfileId());

        Thread.sleep(500);
    }

The result im getting:

9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,

UPDATE 4: Problem solved! Knowing the sample code from https://github.com/rabbitmq/rabbitmq-priority-queue is working in my environment, I presume that the problem is around the spring context. Hence, after countless time on try and error with different type of configurations, and I pin point the exact combination that will make this works! and is as per following:

    <rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
        <entry key="x-max-priority">
            <value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work -->
        </entry>
    </rabbit:queue-arguments>
</rabbit:queue>

Without specifically define the value is Integer type, the priority queue does not work. Finally, it is solved. Yay!


Answer:

RabbitMQ now has a priority queue plugin where messages are delivered in priority order. It would be better to use that rather than your scheme of requeueing low priority messages which will be quite expensive at runtime.

EDIT:

When using the rabbitTemplate.convertAndSend(...) methods, and you want to set the priority property on the message, you either need to implement a custom MessagePropertiesConverter in the template (subclass the DefaultMessagePropertiesConverter) or use the convertAnSend variants that take a message post-processor; e.g.:

template.convertAndSend("exchange", "routingKey", "message", new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setPriority(5);
        return message;
    }
});

Question:

I want to set message header while sending a message to rabbit. I am using below code, but confused how to set message header in it.

public static <T> void sendMessage(String routingKey,final Object message,Class<T> type){
    DefaultClassMapper typeMapper = new DefaultClassMapper();
    typeMapper.setDefaultType(type);

    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setClassMapper(typeMapper);

    RabbitTemplate template = new RabbitTemplate(getConnectionFactory));
    template.setMessageConverter(converter);

    template.convertAndSend(routingKey, message);
}

In above method i am simply arguementing java POJO object and its type to send. I want to know where should i set message header here.


Answer:

Java 8:

template.convertAndSend(routingKey, message, m -> {
    m.getMessageProperties().getHeaders().put("foo", "bar");
    m.getMessageProperties().setPriority(priority);        
    return m;
});

Java 6,7:

template.convertAndSend(routingKey, message, new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message m) throws AmqpException {
        m.getMessageProperties().getHeaders().put("foo", "bar");
        m.getMessageProperties().setPriority(priority);        
        return m;
    }

});