Hot questions for Spring Integration: Amazon Web Services

Top 10 Java Open Source / Spring / Spring Integration: AWS

Question:

When I consume the message from kinesis stream. I get some junk chars with headers etc

    @StreamListener(Processor.INPUT)
    public void receive(String message) {       
        System.out.println("Message recieved: "+message);
        throw new RuntimeException("Exception thrown");
    }

    @StreamListener("errorChannel")
    public void transform(ErrorMessage errorMessage) throws UnsupportedEncodingException {      

        //original paylaod 
        System.out.println("Error Oiginal Message Payload"+new String((byte[])errorMessage.getOriginalMessage().getPayload(), "UTF-8"));
        System.out.println("Error Original Message Stream channel"+errorMessage.getOriginalMessage().getHeaders().get("aws_receivedStream"));
    }

Aplication yml

spring:
  cloud:
    stream:
      bindings:
        input: 
          group: abcd
          destination: stream
          content-type: application/json
          errorChannelEnabled: true
          consumer:
            headerMode: raw

I get output at the both the listener and errorChannel with junk characters

I am trying to extract the original message in errorChannel . Is this the right way to convert the bytes message?

Message recieved: ?contentType "application/json"{"aa":"cc"}

Answer:

The AWS Kinesis doesn't provide any headers entity. So, to leverage such a functionality in Spring Cloud Stream, we are embedding headers into the body of the Kinesis record. For this purpose the headerMode is embeddedHeaders by default in the Kinesis Binder. And for symmetry between producer and consumer this option must not be changed.

The Framework provides out-of-the-box EmbeddedHeadersChannelInterceptor for the target @StreamListener channels and embedded headers are extracted and populated properly to the message to send.

When we handle errors in the errorChannel, we indeed have an errorMessage.getOriginalMessage() as non-transformed - original. And therefore the payload of that message is a byte[] from the record body containing embedded headers.

If you would like to parse them properly. you should use utility:

EmbeddedHeaderUtils.extractHeaders((Message<byte[]>) message, true);

Question:

Here is spring-integration-aws project. They provide example about Inbound Channle adapter:

@SpringBootApplication
public static class MyConfiguration {

    @Autowired
    private AmazonSQSAsync amazonSqs;

    @Bean
    public PollableChannel inputChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageProducer sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, "myQueue");
        adapter.setOutputChannel(inputChannel());
        return adapter;
    }
}

Ok, Channel and SqsMessageDrivenChannelAdapter are defined, but what is the next? Let say that I have spring bean like that:

import com.amazonaws.services.sqs.model.Message;

@Component
public class MyComponent {
    public void onMessage(Message message) throws Exception {
        //handle sqs message
    }
} 
  1. How to tell spring to pass all messages from myQueue to this component?
  2. Is there any additionbal configuration to process messages one by one? For example after receiving message SQS mark them as processing and they are not visible to other clients, so it is needed to fetch only one message, process nad fetch one next. Does this behavior enabled by default?

Answer:

You should read the Spring Integration Reference Manual.

@Component
public class MyComponent {

    @ServiceActivator(inputChannel = "inputChannel")
    public void onMessage(Message message) throws Exception {
        //handle sqs message
    }

} 

Question:

I'm looking for a working example of a Spring app that receives and sends messages to a queue using Spring Integration + Amazon SQS service.


Answer:

This is an example how to configure an outbound channel adapter with an XML:

 <int-aws:sqs-outbound-channel-adapter sqs="sqs"
                                      auto-startup="false"
                                      channel="errorChannel"
                                      phase="100"
                                      id="sqsOutboundChannelAdapter"
                                      queue="foo"
                                      delay-expression="'200'"
                                      message-deduplication-id="foo"
                                      message-group-id-expression="'bar'"
                                      send-timeout="202"
                                      sync="false"
                                      error-message-strategy="errorMessageStrategy"
                                      failure-channel="failureChannel"
                                      success-channel="successChannel"
                                      message-converter="messageConverter"
                                      async-handler="asyncHandler"
                                      resource-id-resolver="resourceIdResolver"/>

The inbound channel adapter looks like this:

<int-aws:sqs-message-driven-channel-adapter sqs="sqs"
                                      auto-startup="false"
                                      channel="errorChannel"
                                      error-channel="nullChannel"
                                      task-executor="taskExecutor"
                                      phase="100"
                                      id="sqsMessageDrivenChannelAdapter"
                                      queues="foo, bar"
                                      message-deletion-policy="NEVER"
                                      max-number-of-messages="5"
                                      visibility-timeout="200"
                                      wait-time-out="40"
                                      send-timeout="2000"
                                      queue-stop-timeout="11000"
                                      destination-resolver="destinationResolver"
                                      resource-id-resolver="resourceIdResolver"/>

And here is Java variant for them:

    @Bean
    @ServiceActivator(inputChannel = "sqsSendChannelWithAutoCreate")
    public MessageHandler sqsMessageHandlerWithAutoQueueCreate() {
        DynamicQueueUrlDestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(amazonSqs(), null);
        destinationResolver.setAutoCreate(true);
        return new SqsMessageHandler(amazonSqs(), destinationResolver);
    }


    @Bean
    public MessageProducer sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs(), "testQueue");
        adapter.setOutputChannel(inputChannel());
        return adapter;
    }

You can find more samples in the tests of the project: https://github.com/spring-projects/spring-integration-aws/tree/master/src/test/java/org/springframework/integration/aws

Question:

I have a producer with following configuration for content-type

spring:
  cloud:
    stream:
      bindings:
        eventOut:
          destination: lab_csi
          content-type: application/json

On consumer side I use spring integration (KinesisMessageDrivenChannelAdapter) to route event to different channels.When I receive Message on listener class like below:

@ServiceActivator(inputChannel = "channelA")
    void handleMessage(Message<?> msg) {
        objectMapper.readValue(msg.getPayload(), MyEvent.class);
    }

the marshalling to MyEvent fails. In the stack error I can see that content-type is part of payload and payload is not still deserialized from json to POJO.

I am wondering how I can deserialize message before doing any other transformation. I don't find any method that I can set a MessageConverter to the adapter.

I appreciate your help.

Thank you


Answer:

Sounds like your producer is Spring Cloud Stream, but consumer is just plain KinesisMessageDrivenChannelAdapter. That is not clear why would one not use Spring Cloud Stream consumer as well, but anyway...

Your problem that SCSt producer serializes message headers together with the payload into the Kinesis record body. Just because AWS Kinesis doesn't support headers per se.

If you really are not interested in the headers on the consumer side, you can disable embedding headers on the producer side:

spring:
  cloud:
    stream:
      bindings:
        eventOut:
          destination: lab_csi
        producer:
        headerMode: none

Otherwise you don't have choice on the plain KinesisMessageDrivenChannelAdapter side unless you use EmbeddedHeaderUtils manually.

Question:

I have written a sample application with spring cloud aws binder

  compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')

Code

@StreamListener(Processor.INPUT)
public void receive(Message<String> message) {      
    System.out.println("Message recieved: "+message);
    System.out.println("Message Payload: "+message.getPayload());    

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        input: 
          group: group
          destination: stream
          content-type: application/json          
        output:  
          group: group
          destination: stream
          content-type: application/json

I have started the application on multiple ports

8081,8082,8083, 8084.

When I publish the message to stream, most of the times more than one instance is consuming message.

For example I sent message {"22":"11"}, this has been consumed by both 8083 and 8084

message on application:8084

2018-03-16 12:29:19.715  INFO 10084 --- [           main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:19.809  INFO 10084 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8084 (http) with context path ''
2018-03-16 12:29:19.809  INFO 10084 --- [           main] com.example.aws.AwsApplication           : Started AwsApplication in 21.034 seconds (JVM running for 22.975)
2018-03-16 12:29:19.840  INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:30:23.929  INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The records '[{SequenceNumber: 49582549849562056887358041088912873574803531055853731842,ApproximateArrivalTimestamp: Fri Mar 16 12:30:21 IST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=47 cap=47],PartitionKey: partitionKey-0,}]' are skipped from processing because their sequence numbers are less than already checkpointed: 49582549849562056887358041088912873574803531055853731842
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=f6cb4b6d-e149-059f-7e4d-aa9dfeeef10e, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183774995}]
Message Payload: {"22":"11"}

message on application:8083

018-03-16 12:29:05.733  INFO 8188 --- [           main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:05.733  INFO 8188 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:29:05.796  INFO 8188 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8083 (http) with context path ''
2018-03-16 12:29:05.796  INFO 8188 --- [           main] com.example.aws.AwsApplication           : Started AwsApplication in 19.463 seconds (JVM running for 20.956)
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=cf8647fe-8ce5-70b5-eeb9-74a08efc870a, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183775155}]
Message Payload: {"22":"11"}

Ideally only one consumer in group shall handle message. Am I missing something here.


Answer:

Thank you for validation the solution!

I think I found where is the problem. It is in the ShardCheckpointer.checkpoint(String sequenceNumber).

The current code is like this:

if (existingSequence == null ||
        new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
        this.checkpointStore.put(this.key, sequenceNumber);
        return true;
}

Where there is a race condition, when both (all?) nodes check the state and get a smaller value from the store. So, we are passing condition and then we all go to the checkpointStore.put() part. And here all of them stores a new the same value and return true to let the Channel Adapter to process the same record.

My fix for this is like:

       if (existingSequence == null ||
                new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
            if (existingSequence != null) {
                return this.checkpointStore.replace(this.key, existingSequence, sequenceNumber);
            }
            else {
                this.checkpointStore.put(this.key, sequenceNumber);
                return true;
            }
        }

The same condition, but I also use checkpointStore.replace() with this contract:

/**
 * Atomically replace the value for the key in the store if the old
 * value matches the oldValue argument.

Right now I try to come up with the test-case to validate and will let you know when BUILD-SNAPSHOT is ready to consume and validate on your side.

Question:

I working on integrating Spring Integration with AWS SQS queue.

I have an issue when my method annotated with @ServiceActivator throws an exception. It seems that in such cases the message is removed from the queue anyway. I've configured MessageDeletionPolicy to ON_SUCCESS in SqsMessageDrivenChannelAdapter.

Here is my channel/adapter configuration https://github.com/sdusza1/spring-integration-sqs/blob/master/src/main/java/com/example/demo/ChannelConfig.java

I've tried doing the same using @SqsListener annotation and messages are not deleted as expected.

I've created a mini Spring Boot app here to demonstrate this issue: https://github.com/sdusza1/spring-integration-sqs

Please help :)


Answer:

Your configuration is like this:

@Bean
public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs, SQS_QUEUE_NAME);
    adapter.setOutputChannel(inboundChannel());
    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
    adapter.setVisibilityTimeout(RETRY_NOTIFICATION_AFTER);
    return adapter;

}

Where the inboundChannel is like this:

 @Bean
    public QueueChannel inboundChannel() {
        return new QueueChannel();
 }

So, this is a queue, therefore async and the message from that queue is processed on a separate thread by the TaskScheduler which polls this kind of channel according your PollerMetadata configuration. In this case any errors in the consumer are thrown into that thread as well and don't reach the SqsMessageDrivenChannelAdapter for expected error handling.

This technically is fully different from your @SqsListener experience which is really called directly on the container thread, and therefore its error handling is applied.

Or you need to revise your logic how you would like to handle errors in that separate thread or just don't use a QueueChannel just after SqsMessageDrivenChannelAdapter and let it throw and handle errors in the underlying SQS Listener Container as it is in case of @SqsListener.