Hot questions for Messaging with RabbitMQ

Top 10 Java Open Source / Spring / Messaging with RabbitMQ

Question:

I'm following this guide to learn how to use spring-rabbit with RabbitMQ. However in this guide, the RabbitMQ configuration is as default(localhost server and with credential as guest/guest). What should I do if I want to connect to an remote RabbitMQ with ip address and credential? I don't know where to set these information in my application.


Answer:

The application for that guide is a Spring Boot Application.

Add a file application.properties to src/main/resources.

You can then configure rabbitmq properties according to the Spring Boot Documentation - scroll down to the rabbitmq properties...

...
spring.rabbitmq.host=localhost # RabbitMQ host.
...
spring.rabbitmq.password= # Login to authenticate against the broker.
spring.rabbitmq.port=5672 # RabbitMQ port.
...
spring.rabbitmq.username= # Login user to authenticate to the broker.
...

To connect to a cluster, use

spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect.

e.g. server1:5672,server2:5672.

If you don't want to use boot auto configuration, declare a CachingConnectionFactory @Bean yourself and configure it as desired.

Question:

How to acknowledge the messages manually without using auto acknowledgement. Is there a way to use this along with the @RabbitListener and @EnableRabbit style of configuration. Most of the documentation tells us to use SimpleMessageListenerContainer along with ChannelAwareMessageListener. However using that we lose the flexibility that is provided with the annotations. I have configured my service as below :

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}

My RabbitConfiguration is as below
@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
    }

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    return connectionFactory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setContainerFactory(myRabbitListenerContainerFactory());
}

@Autowired
private EventReceiver receiver;
}
}

Any help will be appreciated on how to adapt manual channel acknowledgement along with the above style of configuration. If we implement the ChannelAwareMessageListener then the onMessage signature will change. Can we implement ChannelAwareMessageListener on a service ?


Answer:

Add the Channel to the @RabbitListener method...

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}

and use the tag in the basicAck, basicReject.

EDIT

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}

application.properties:

spring.rabbitmq.listener.acknowledge-mode=manual

Question:

How to mock spring rabbitmq/amqp so it will not fail during a Spring Boot Test while trying to auto create exchanges/queues?

Given I have a simple RabbitListener that will cause the queue and exchange to be auto created like this:

@Component
@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(value = "myqueue", autoDelete = "true"), 
                exchange = @Exchange(value = "myexchange", autoDelete = "true", type = "direct"), 
                key = "mykey")}
)
@RabbitListenerCondition
public class EventHandler {
    @RabbitHandler
    public void onEvent(Event event) {
      ...
    }   
}

During a simple Spring Boot Test, like this:

@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = { Application.class })

    @Autowired
    private ApplicationContext applicationContext;

    @Test
    public void test() {
        assertNotNull(applicationContext);
    }

}

it will fail with:

16:22:16.527 [SimpleAsyncTaskExecutor-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer - Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:62)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:309)

In in this test I don't care about Rabbit/AMQP, so how can I mock the whole Rabbit/AMQP away?


Answer:

I know this is an old topic, but I'd like to introduce a mocking library I'm developping : rabbitmq-mock.

The purpose of this mock is to mimic RabbitMQ behavior without IO (without starting a server, listening to some port, etc.) and with minor (~ none) startup time.

It is available in Maven Central:

<dependency>
    <groupId>com.github.fridujo</groupId>
    <artifactId>rabbitmq-mock</artifactId>
    <version>1.0.10</version>
    <scope>test</scope>
</dependency>

Basic use will be to override Spring configuration with a test one :

@Configuration
@Import(AmqpApplication.class)
class AmqpApplicationTestConfiguration {

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(MockConnectionFactoryFactory.build());
    }
}

For automatic mocking of Spring beans for tests, give a look at another project I'm working on: spring-automocker

Hope this can help !

Question:

From spring boot tutorial: https://spring.io/guides/gs/messaging-rabbitmq/

They give an example of creating 1 queue and 1 queue only, but, what if I want to be able to create more then 1 queue? how would it be possible?

Obviously, I can't just create the same bean twice:

@Bean
Queue queue() {
    return new Queue(queueNameAAA, false);
}

@Bean
Queue queue() {
    return new Queue(queueNameBBB, false);
}

You can't create the same bean twice, it will make ambiguous.


Answer:

Give the bean definition factory methods different names. Usually, by convention, you would name them the same as the queue, but that's not required...

@Bean
Queue queue1() {
    return new Queue(queueNameAAA, false);
}

@Bean
Queue queue2() {
    return new Queue(queueNameBBB, false); 
}

The method name is the bean name.

EDIT

When using the queues in the binding beans, there are two options:

@Bean
Binding binding1(@Qualifier("queue1") Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueNameAAA);
}

@Bean
Binding binding2(@Qualifier("queue2") Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueNameBBB);
}

or

@Bean
Binding binding1(TopicExchange exchange) {
    return BindingBuilder.bind(queue1()).to(exchange).with(queueNameAAA);
}

@Bean
Binding binding2(TopicExchange exchange) {
    return BindingBuilder.bind(queue2()).to(exchange).with(queueNameBBB);
}

or even better...

@Bean
Binding binding1(TopicExchange exchange) {
    return BindingBuilder.bind(queue1()).to(exchange).with(queue1().getName());
}

@Bean
Binding binding2(TopicExchange exchange) {
    return BindingBuilder.bind(queue2()).to(exchange).with(queue2().getName());
}

Question:

I think I am missing something here..I am trying to create simple rabbit listner which can accept custom object as message type. Now as per doc it says

In versions prior to 1.6, the type information to convert the JSON had to be provided in message headers, or a custom ClassMapper was required. Starting with version 1.6, if there are no type information headers, the type can be inferred from the target method arguments.

I am putting message manually in to queue using rabbit mq adm in dashboard,getting error like

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.Customer] for GenericMessage [payload=byte[21], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=customer, amqp_deliveryTag=1, amqp_consumerQueue=customer, amqp_redelivered=false, id=81e8a562-71aa-b430-df03-f60e6a37c5dc, amqp_consumerTag=amq.ctag-LQARUDrR6sUcn7FqAKKVDA, timestamp=1485635555742}]

My configuration:

@Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new    CachingConnectionFactory("localhost");
        connectionFactory.setUsername("test");
        connectionFactory.setPassword("test1234");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

Also question is with this exception message is not put back in the queue.

I am using spring boot 1.4 which brings amqp 1.6.1.

Edit1 : I added jackson converter as above (prob not required with spring boot) and given contenty type on rmq admin but still got below, as you can see above I am not configuring any listener container yet.

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.Customer] for GenericMessage [payload=byte[21], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=customer, content_type=application/json, amqp_deliveryTag=3, amqp_consumerQueue=customer, amqp_redelivered=false, id=7f84d49d-037a-9ea3-e936-ed5552d9f535, amqp_consumerTag=amq.ctag-YSemzbIW6Q8JGYUS70WWtA, timestamp=1485643437271}]

Answer:

If you are using boot, you can simply add a Jackson2JsonMessageConverter @Bean to the configuration and it will be automatically wired into the listener (as long as it's the only converter). You need to set the content_type property to application/json if you are using the administration console to send the message.

Conversion errors are considered fatal by default because there is generally no reason to retry; otherwise they'd loop for ever.

EDIT

Here's a working boot app...

@SpringBootApplication
public class So41914665Application {

    public static void main(String[] args) {
        SpringApplication.run(So41914665Application.class, args);
    }

    @Bean
    public Queue queue() {
        return new Queue("foo", false, false, true);
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @RabbitListener(queues = "foo")
    public void listen(Foo foo) {
        System.out.println(foo);
    }


    public static class Foo {

        public String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

I sent this message

With this result:

2017-01-28 21:49:45.509  INFO 11453 --- [           main] com.example.So41914665Application        : Started So41914665Application in 4.404 seconds (JVM running for 5.298)
Foo [bar=baz]

Boot will define an admin and template for you.

Question:

I am facing an issue in receiving a message from RabbitMQ. I am sending a message like below

        HashMap<Object, Object> senderMap=new HashMap<>();
        senderMap.put("STATUS", "SUCCESS");
        senderMap.put("EXECUTION_START_TIME", new Date());

        rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);

If we see in RabbitMQ, we will get a fully qualified type.

In the current scenario, we have n number of producer for the same consumer. If i use any mapper, it leads to an exception. How will i send a message so that it doesn't contain any type_id and i can receive the message as Message object and later i can bind it to my custom object in the receiver.

I am receiving message like below. Could you please let me know how to use Jackson2MessageConverter so that message will get directly binds to my Object/HashMap from Receiver end. Also i have removed the Type_ID now from the sender.

How Message looks in RabbitMQ

priority: 0 delivery_mode: 2 headers: ContentTypeId: java.lang.Object KeyTypeId: java.lang.Object content_encoding: UTF-8 content_type: application/json {"Execution_start_time":1473747183636,"status":"SUCCESS"}

@Component
public class AdapterOutputHandler {

    private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);

    @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
    public void handleAdapterQueueMessage(HashMap<String,Object> message){

        System.out.println("Receiver:::::::::::"+message.toString());

    }

}

Connection

@Bean(name="adapterOPListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);

    }

Exception

Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
    at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)

I don't want to use __TYPE__ID from sender because they are multiple senders for the same queue and only one consumer.


Answer:

it leads to an exception

What exception?

TypeId: com.diff.approach.JobListenerDTO

That means you are sending a DTO, not a hash map as you describe in the question.

If you want to remove the typeId header, you can use a message post processor...

rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> {
    m.getMessageProperties.getHeaders().remove("__TypeId__");
    return m;
});

(or , new MessagePostProcessor() {...} if you're not using Java 8).

EDIT

What version of Spring AMQP are you using? With 1.6 you don't even have to remove the __TypeId__ header - the framework looks at the listener parameter type and tells the Jackson converter the type so it automatically converts to that (if it can). As you can see here; it works fine without removing the type id...

package com.example;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class So39443850Application {

    private static final String QUEUE = "so39443850";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
        context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
        context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
        context.close();
    }

    private final CountDownLatch latch = new CountDownLatch(1);

    @RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
    public void listen(HashMap<String, Object> message) {
        System.out.println(message.getClass() + ":" + message);
        latch.countDown();
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    public static class DTO {

        private String foo;

        private String baz;

        public DTO(String foo, String baz) {
            this.foo = foo;
            this.baz = baz;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        public String getBaz() {
            return this.baz;
        }

        public void setBaz(String baz) {
            this.baz = baz;
        }

    }

}

Result:

class java.util.HashMap:{foo=baz, baz=qux}

This is described in the documentation...

In versions prior to 1.6, the type information to convert the JSON had to be provided in message headers, or a custom ClassMapper was required. Starting with version 1.6, if there are no type information headers, the type can be inferred from the target method arguments.

You can also configure a custom ClassMapper to always return HashMap.

Question:

I have put together a java test. It puts a message on a queue and returns it as a string. What Im trying to achieve is for it to it convert into the java object SignUpDto. I have stripped down the code as much as possible for the question.

The question:

How do I modify the test below to convert into a object?


SignUpClass

public class SignUpDto {
    private String customerName;
    private String isoCountryCode;
    ... etc
}

Application - Config class

@Configuration
public class Application  {

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {

        // updated with @GaryRussels feedback
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }
}

The Test

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {Application.class})
public class TestQueue {

    @Test
    public void convertMessageIntoObject(){

        ApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        AmqpTemplate template = context.getBean(AmqpTemplate.class);

        String jsonString = "{ \"customerName\": \"TestName\", \"isoCountryCode\": \"UK\" }";

        template.convertAndSend("myqueue", jsonString);

        String foo = (String) template.receiveAndConvert("myqueue");

        // this works ok    
        System.out.println(foo);

        // How do I make this convert
        //SignUpDto objFoo = (SignUpDto) template.receiveAndConvert("myqueue");
        // objFoo.toString()  

    }
}

Answer:

Configure the RabbitTemplate with a Jackson2JsonMessageConverter.

Then use

template.convertAndSend("myqueue", myDto);

...

SignUpDto out = (SignUpDto) template.receiveAndConvert("myQueue");

Note that the outbound conversion sets up the content type (application/json) and headers with type information that tells the receiving converter what object type to create.

If you really do want to send a simple String of JSON, you need to set the content type to application/json. To help the inbound conversion, you can either set the type headers (look at the converter source for information), or you can configure the converter with a ClassMapper to determine the type.

EDIT

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
         message-converter="json" />

<bean id="json"
 class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

Or, since you are using Java Config; simply inject one into your template definition.

EDIT2

If you want to send a plain JSON string; you need to help the inbound converter via headers.

To set the headers...

template.convertAndSend("", "myQueue", jsonString, new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setContentType("application/json");
        message.getMessageProperties().getHeaders()
            .put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, "foo.SignUpDto");
        return message;
    }
});

Bear in mind, though, that this sending template must NOT have a JSON message converter (let it default to the SimpleMessageConverter). Otherwise, the JSON will be double-encoded.

Question:

I currently have my rabbit listener annotation set as:

@RabbitListener(queues = "my-queue")

Is it not possible to pull in the queue name from my yaml file. The reason I want to do this, is so that I can change my queue to a test queue for my integration test, simply by changing the queue name in the yaml file. It appears the annotation must accept a constant string, is there a way round this? Thanks,


Answer:

Yes, it is called properties place holder and can be done like this:

@RabbitListener(queues = "${myQueue.property}")

Where that myQueue.property is exactly declared in your yaml.

https://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#async-annotation-driven

The argument name, value, and type can be property placeholders (${...}) or SpEL expressions (#{...}). The name must resolve to a String; the expression for type must resolve to a Class or the fully-qualified name of a class. The value must resolve to something that can be converted by the DefaultConversionService to the type (such as the x-message-ttl in the above example).

Question:

Previously I was reading all the messages present in the queue, but now I have to return specific amount of message based of users choice(count).

I try to change the for loop accordingly but its reading all the message because of auto acknowledge. So i tried changing it to manual in config file.

In my program how to ack message manually after reading msg(currently i am using AmqpTemplate to receive and i don't have reference of channel)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());

            valueList.add(value);
            messageCount--;
        }
}

Any help is highly appreciable, Thanks in advance.


Answer:

You cannot manually ack with the receive() method - use a SimpleMessageListenerContainer for an event-driven consumer with MANUAL acks and a ChannelAwareMessageListener. Or, use the template's execute() method which gives you access to the Channel - but then you will be using the lower-level RabbitMQ API, not the Message abstraction.

EDIT:

You need to learn the underlying RabbitMQ Java API to use execute, but something like this will work...

    final int messageCount = 3;
    boolean result = template.execute(new ChannelCallback<Boolean>() {

        @Override
        public Boolean doInRabbit(final Channel channel) throws Exception {
            int n = messageCount;
            channel.basicQos(messageCount); // prefetch
            long deliveryTag = 0;
            while (n > 0) {
                GetResponse result = channel.basicGet("si.test.queue", false);
                if (result != null) {
                    System.out.println(new String(result.getBody()));
                    deliveryTag = result.getEnvelope().getDeliveryTag();
                    n--;
                }
                else {
                    Thread.sleep(1000);
                }
            }
            if (deliveryTag > 0) {
                channel.basicAck(deliveryTag, true);
            }
            return true;
        }
    });