Spring Cloud Stream: @StreamListener processing messages twice

spring cloud stream kafka binder configuration example
spring cloud stream multiple binders
spring cloud stream multiple input channels
spring cloud stream dynamic destinations
spring cloud stream rabbitmq example
spring cloud stream jms example
spring cloud stream: bindings
spring cloud stream-test-support

I'm using Spring Cloud Stream (Edgware.SR5) with Spring Boot (1.5.10.RELEASE). My @StreamListener is processing twice every message it receives.

The idea of the example is to publish a message in a queue and the process it.

Service:

@EnableBinding(ExampleBindings.class)
@Service
public class ExampleService {

    @Publisher(channel = ExampleBindings.OUTPUT)
    public String queue(String message){
        return message;
    }

    @StreamListener(ExampleBindings.INPUT)
    public void dequeue(String message){
        System.out.println("New message: " + message);
    }
}

Bindings:

public interface ExampleBindings {

    String INPUT = "input1";
    String OUTPUT = "output1";

    @Input(ExampleBindings.INPUT)
    SubscribableChannel input();

    @Output(ExampleBindings.OUTPUT)
    MessageChannel output();
}

application.properties:

spring.cloud.stream.default.group=group1
spring.cloud.stream.default.binder=binder1

spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1

spring.cloud.stream.binders.binder1.type=rabbit
spring.cloud.stream.binders.binder1.environment.spring.rabbitmq.host=localhost

Configuration (for injecting proxied service in the test):

@Configuration
public class ExampleConfig {

    @Bean
    public PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor(){
         PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor =
            new PublisherAnnotationBeanPostProcessor();
        publisherAnnotationBeanPostProcessor.setProxyTargetClass(true);
        return publisherAnnotationBeanPostProcessor;
    }
}

Test:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ExampleServiceTest {

    @Autowired
    private ExampleService exampleService;

    @Test
    public void testQueue() throws InterruptedException {
        exampleService.queue("Hello!");
        Thread.sleep(1000);//Wait for message processing
        System.out.println("Ready!");
    }
}

I'm having the following output:

17:19:10.230 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b received message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.231 [dest1.group1-1] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
17:19:10.231 [dest1.group1-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.232 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
17:19:10.232 [dest1.group1-2] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
Ready!

I can't figure out what is the problem with my configuration or if it is some bug, any advice?

Thanks!

EDITED:

I uploaded a (non) working example here

You can create a RabbitMQ instance using:

docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management

I detected that @Publisher was publishing twice because of the configuration in ExampleConfig. This new configuration (borrowed from here) seems to work fine:

@Bean
public static BeanFactoryPostProcessor bfpp() {
    return bf -> bf.getBean(IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME,
        PublisherAnnotationBeanPostProcessor.class).setProxyTargetClass(true);
}

Spring Cloud Stream: #StreamListener processing messages twice , I'm using Spring Cloud Stream (Edgware.SR5) with Spring Boot (1.5.10.​RELEASE). My #StreamListener is processing twice every message it receives. The idea  In a Spring Boot app using Spring Cloud Stream connecting to Kafka, I'm trying to set up two separate stream listener methods: One reads from topics "t1" and "t2" as KTables, re-partitioning using a different key in one, then joining to data from the other


I was running my application in debug mode(intellij) due to which somehow the offset wasn't getting updated. Try running on run mode and it solved my problem.

Spring Cloud Stream Reference Guide, `@StreamListener and Message Conversion Spring Cloud Stream provides the interfaces Source , Sink , and Processor ; you can also valueOf(avg(data))); } private static Double avg(List<String> data) { double sum = 0; double count = 0;​  Spring Cloud Stream does this through the spring.cloud.stream.instanceCount and spring.cloud.stream.instanceIndex properties. For example, if there are three instances of a HDFS sink application, all three instances have spring.cloud.stream.instanceCount set to 3 , and the individual applications have spring.cloud.stream.instanceIndex set to 0


From the config, I think, you are trying to publish the same message again to the same destination dest_1.

spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1

And from the log it is clear, the 2nd message has a different ID

id=788e8bbf-4ae4-86cc-0859-d4f153cb5807
id=2f22ce16-bb5a-350c-8b3d-e6c898760888

spring-cloud/spring-cloud-stream, if i call the repository 2 times the method receiveAccount is called twice can when the channel input receives a message the receiveAccount method is used to that I run into a timing issue when trying to access the stream-builder-​process bean. @Autowired ConfigurableApplicationContext context; @​StreamListener. The following are top voted examples for showing how to use org.springframework.cloud.stream.annotation.StreamListener. These examples are extracted from open source projects. You can vote up the examples you like and your votes will be used in our system to generate more good examples.


Introduction to Spring Cloud Stream, SR1 for spring-cloud-depencies and spring-boot-starter-parent 2.1.4. @/all Guys , is it not possible for messaging conversion in Spring Cloud Stream with Kafka binders LoggingErrorHandler : Error while processing: ConsumerRecord​(topic kstream @streamlistener, that message would fail to process and underlining  Spring Cloud Stream allows a user to develop and run messaging microservices using Spring Integration and run them locally, or in the cloud, or even on Spring Cloud Data Flow. Just add @EnableBinding and run your app as a Spring Boot app (single application context).


Event-Driven for the Enterprise: Spring Cloud Stream, Spring Cloud , Channel — represents the communication pipe between messaging-middleware and the application; StreamListeners — message-handling  Spring Cloud Bus uses Spring Cloud Stream to broadcast the messages. So, to get messages to flow, you need only include the binder implementation of your choice in the classpath. There are convenient starters for the bus with AMQP (RabbitMQ) and Kafka (spring-cloud-starter-bus-[amqp|kafka]). Generally speaking, Spring Cloud Stream relies on


Setting up RabbitMQ with Spring Cloud Stream, We did not tell Spring Cloud Stream what message broker we are going to use. annotated as @StreamListener by saving the message to a database. Double click the transform processor, and click the little configuration  Have a new issue with spring-boot applications (e.g. spring-cloud-config-server & spring-cloud-eureka-server) where the same JMX bean is getting registered twice causing a failure. Seems that the initial registration is coming from a cal