Hot questions for Spring Integration: Amazon Web Services
Question:
When I consume the message from kinesis stream. I get some junk chars with headers etc
@StreamListener(Processor.INPUT) public void receive(String message) { System.out.println("Message recieved: "+message); throw new RuntimeException("Exception thrown"); } @StreamListener("errorChannel") public void transform(ErrorMessage errorMessage) throws UnsupportedEncodingException { //original paylaod System.out.println("Error Oiginal Message Payload"+new String((byte[])errorMessage.getOriginalMessage().getPayload(), "UTF-8")); System.out.println("Error Original Message Stream channel"+errorMessage.getOriginalMessage().getHeaders().get("aws_receivedStream")); }
Aplication yml
spring: cloud: stream: bindings: input: group: abcd destination: stream content-type: application/json errorChannelEnabled: true consumer: headerMode: raw
I get output at the both the listener and errorChannel with junk characters
I am trying to extract the original message in errorChannel . Is this the right way to convert the bytes message?
Message recieved: ?contentType "application/json"{"aa":"cc"}
Answer:
The AWS Kinesis doesn't provide any headers entity. So, to leverage such a functionality in Spring Cloud Stream, we are embedding headers into the body of the Kinesis record. For this purpose the headerMode
is embeddedHeaders
by default in the Kinesis Binder. And for symmetry between producer and consumer this option must not be changed.
The Framework provides out-of-the-box EmbeddedHeadersChannelInterceptor
for the target @StreamListener
channels and embedded headers are extracted and populated properly to the message to send.
When we handle errors in the errorChannel
, we indeed have an errorMessage.getOriginalMessage()
as non-transformed - original. And therefore the payload
of that message is a byte[]
from the record body containing embedded headers.
If you would like to parse them properly. you should use utility:
EmbeddedHeaderUtils.extractHeaders((Message<byte[]>) message, true);
Question:
Here is spring-integration-aws
project. They provide example about Inbound Channle adapter:
@SpringBootApplication public static class MyConfiguration { @Autowired private AmazonSQSAsync amazonSqs; @Bean public PollableChannel inputChannel() { return new QueueChannel(); } @Bean public MessageProducer sqsMessageDrivenChannelAdapter() { SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, "myQueue"); adapter.setOutputChannel(inputChannel()); return adapter; } }
Ok, Channel
and SqsMessageDrivenChannelAdapter
are defined, but what is the next? Let say that I have spring bean like that:
import com.amazonaws.services.sqs.model.Message; @Component public class MyComponent { public void onMessage(Message message) throws Exception { //handle sqs message } }
- How to
tell
spring to pass all messages frommyQueue
to this component? - Is there any additionbal configuration to process messages one by
one? For example after receiving message
SQS
mark them as processing and they are not visible to other clients, so it is needed to fetch only one message, process nad fetch one next. Does this behavior enabled by default?
Answer:
You should read the Spring Integration Reference Manual.
@Component public class MyComponent { @ServiceActivator(inputChannel = "inputChannel") public void onMessage(Message message) throws Exception { //handle sqs message } }
Question:
I'm looking for a working example of a Spring app that receives and sends messages to a queue using Spring Integration + Amazon SQS service.
Answer:
This is an example how to configure an outbound channel adapter
with an XML:
<int-aws:sqs-outbound-channel-adapter sqs="sqs" auto-startup="false" channel="errorChannel" phase="100" id="sqsOutboundChannelAdapter" queue="foo" delay-expression="'200'" message-deduplication-id="foo" message-group-id-expression="'bar'" send-timeout="202" sync="false" error-message-strategy="errorMessageStrategy" failure-channel="failureChannel" success-channel="successChannel" message-converter="messageConverter" async-handler="asyncHandler" resource-id-resolver="resourceIdResolver"/>
The inbound channel adapter
looks like this:
<int-aws:sqs-message-driven-channel-adapter sqs="sqs" auto-startup="false" channel="errorChannel" error-channel="nullChannel" task-executor="taskExecutor" phase="100" id="sqsMessageDrivenChannelAdapter" queues="foo, bar" message-deletion-policy="NEVER" max-number-of-messages="5" visibility-timeout="200" wait-time-out="40" send-timeout="2000" queue-stop-timeout="11000" destination-resolver="destinationResolver" resource-id-resolver="resourceIdResolver"/>
And here is Java variant for them:
@Bean @ServiceActivator(inputChannel = "sqsSendChannelWithAutoCreate") public MessageHandler sqsMessageHandlerWithAutoQueueCreate() { DynamicQueueUrlDestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(amazonSqs(), null); destinationResolver.setAutoCreate(true); return new SqsMessageHandler(amazonSqs(), destinationResolver); } @Bean public MessageProducer sqsMessageDrivenChannelAdapter() { SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs(), "testQueue"); adapter.setOutputChannel(inputChannel()); return adapter; }
You can find more samples in the tests of the project: https://github.com/spring-projects/spring-integration-aws/tree/master/src/test/java/org/springframework/integration/aws
Question:
I have a producer with following configuration for content-type
spring: cloud: stream: bindings: eventOut: destination: lab_csi content-type: application/json
On consumer side I use spring integration (KinesisMessageDrivenChannelAdapter) to route event to different channels.When I receive Message on listener class like below:
@ServiceActivator(inputChannel = "channelA") void handleMessage(Message<?> msg) { objectMapper.readValue(msg.getPayload(), MyEvent.class); }
the marshalling to MyEvent fails. In the stack error I can see that content-type is part of payload and payload is not still deserialized from json to POJO.
I am wondering how I can deserialize message before doing any other transformation. I don't find any method that I can set a MessageConverter to the adapter.
I appreciate your help.
Thank you
Answer:
Sounds like your producer is Spring Cloud Stream, but consumer is just plain KinesisMessageDrivenChannelAdapter
. That is not clear why would one not use Spring Cloud Stream consumer as well, but anyway...
Your problem that SCSt producer serializes message headers together with the payload into the Kinesis record body. Just because AWS Kinesis doesn't support headers per se.
If you really are not interested in the headers on the consumer side, you can disable embedding headers on the producer side:
spring: cloud: stream: bindings: eventOut: destination: lab_csi producer: headerMode: none
Otherwise you don't have choice on the plain KinesisMessageDrivenChannelAdapter
side unless you use EmbeddedHeaderUtils
manually.
Question:
I have written a sample application with spring cloud aws binder
compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')
Code
@StreamListener(Processor.INPUT) public void receive(Message<String> message) { System.out.println("Message recieved: "+message); System.out.println("Message Payload: "+message.getPayload()); }
application.yml
spring: cloud: stream: bindings: input: group: group destination: stream content-type: application/json output: group: group destination: stream content-type: application/json
I have started the application on multiple ports
8081,8082,8083, 8084.
When I publish the message to stream, most of the times more than one instance is consuming message.
For example I sent message {"22":"11"}, this has been consumed by both 8083 and 8084
message on application:8084
2018-03-16 12:29:19.715 INFO 10084 --- [ main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'} 2018-03-16 12:29:19.809 INFO 10084 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8084 (http) with context path '' 2018-03-16 12:29:19.809 INFO 10084 --- [ main] com.example.aws.AwsApplication : Started AwsApplication in 21.034 seconds (JVM running for 22.975) 2018-03-16 12:29:19.840 INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started. 2018-03-16 12:30:23.929 INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The records '[{SequenceNumber: 49582549849562056887358041088912873574803531055853731842,ApproximateArrivalTimestamp: Fri Mar 16 12:30:21 IST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=47 cap=47],PartitionKey: partitionKey-0,}]' are skipped from processing because their sequence numbers are less than already checkpointed: 49582549849562056887358041088912873574803531055853731842 Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=f6cb4b6d-e149-059f-7e4d-aa9dfeeef10e, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183774995}] Message Payload: {"22":"11"}
message on application:8083
018-03-16 12:29:05.733 INFO 8188 --- [ main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'} 2018-03-16 12:29:05.733 INFO 8188 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started. 2018-03-16 12:29:05.796 INFO 8188 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8083 (http) with context path '' 2018-03-16 12:29:05.796 INFO 8188 --- [ main] com.example.aws.AwsApplication : Started AwsApplication in 19.463 seconds (JVM running for 20.956) Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=cf8647fe-8ce5-70b5-eeb9-74a08efc870a, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183775155}] Message Payload: {"22":"11"}
Ideally only one consumer in group shall handle message. Am I missing something here.
Answer:
Thank you for validation the solution!
I think I found where is the problem. It is in the ShardCheckpointer.checkpoint(String sequenceNumber)
.
The current code is like this:
if (existingSequence == null || new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) { this.checkpointStore.put(this.key, sequenceNumber); return true; }
Where there is a race condition, when both (all?) nodes check the state and get a smaller value from the store. So, we are passing condition and then we all go to the checkpointStore.put()
part. And here all of them stores a new the same value and return true
to let the Channel Adapter to process the same record.
My fix for this is like:
if (existingSequence == null || new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) { if (existingSequence != null) { return this.checkpointStore.replace(this.key, existingSequence, sequenceNumber); } else { this.checkpointStore.put(this.key, sequenceNumber); return true; } }
The same condition, but I also use checkpointStore.replace()
with this contract:
/** * Atomically replace the value for the key in the store if the old * value matches the oldValue argument.
Right now I try to come up with the test-case to validate and will let you know when BUILD-SNAPSHOT
is ready to consume and validate on your side.
Question:
I working on integrating Spring Integration with AWS SQS queue.
I have an issue when my method annotated with @ServiceActivator
throws an exception. It seems that in such cases the message is removed from the queue anyway. I've configured MessageDeletionPolicy
to ON_SUCCESS
in SqsMessageDrivenChannelAdapter
.
Here is my channel/adapter configuration https://github.com/sdusza1/spring-integration-sqs/blob/master/src/main/java/com/example/demo/ChannelConfig.java
I've tried doing the same using @SqsListener
annotation and messages are not deleted as expected.
I've created a mini Spring Boot app here to demonstrate this issue: https://github.com/sdusza1/spring-integration-sqs
Please help :)
Answer:
Your configuration is like this:
@Bean public MessageProducerSupport sqsMessageDrivenChannelAdapter() { SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs, SQS_QUEUE_NAME); adapter.setOutputChannel(inboundChannel()); adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS); adapter.setVisibilityTimeout(RETRY_NOTIFICATION_AFTER); return adapter;
}
Where the inboundChannel
is like this:
@Bean public QueueChannel inboundChannel() { return new QueueChannel(); }
So, this is a queue, therefore async and the message from that queue is processed on a separate thread by the TaskScheduler
which polls this kind of channel according your PollerMetadata
configuration. In this case any errors in the consumer are thrown into that thread as well and don't reach the SqsMessageDrivenChannelAdapter
for expected error handling.
This technically is fully different from your @SqsListener
experience which is really called directly on the container thread, and therefore its error handling is applied.
Or you need to revise your logic how you would like to handle errors in that separate thread or just don't use a QueueChannel
just after SqsMessageDrivenChannelAdapter
and let it throw and handle errors in the underlying SQS Listener Container as it is in case of @SqsListener
.