Hot questions for Spring XD

Question:

Is there a way to launch a set of steams and taps that have been defined when a new instance of spring xd is launched?

The scenario is this: We've done some prototyping on our dev systems, but now when we'd like to build and release this to a test team. We'd like to automate most of the background work and the user needs to just worry about the output from the streams and not have to worry about the streams or their definitions or their deployment.

To facilitate this is there any runtime configurations we can use?

UPDATE: Our current approach involves writing a shell script to make some rest calls to the Spring API to create and deploy the required streams.


Answer:

The shell maintains a log of user commands:

spring-shell.log

You can edit it after creating your stream(s) (or create a file with shell commands) and then use the script command:

xd:>script foo.xd

or

$ bin/xd-shell < foo.xd

(the .xd suffix is not required, the file name can be anything).

EDIT (comment below)...

This does come with the caveat that bin/xd-shell < foo.xd will run all commands regardless of success or failure, and xd-shell --cmdfile foo.xd will terminate execution with an exit code immediately should a command fail.

Question:

I would like to create a module which will filter messages from the input channel and transform them into something else on the output. I know I can seperate this in two modules (I prefer Java code over scripts) like:

@Filter(inputChannel = "input", outputChannel = "output")
public boolean accept(final Message<?> message) {
    final MyObject payload = (MyObject) message.getPayload();
    return payload.getName().equals("test");
}


@Transformer(inputChannel = "input", outputChannel = "output")
public OtherObject transform(final MyObject data) {
    return convert(data);
}

but I would like to do this in a single module. If I move the filtering logic to the transfomer module and I return null values on non-acceptable payloads, I start getting spring-xd runtime exceptions. What would be the correct approach to this?

--EDIT--

Configuration:

@Configuration
@EnableIntegration
public class ModuleConfiguration {

@Bean
public MessageChannel input() {
    return new DirectChannel();
}

@Bean
public MessageChannel output() {
    return new DirectChannel();
}

@Bean
public MessageChannel myChannel() {
    return new DirectChannel();
}

@Bean
public MyFilter filter() {
    return new MyFilter();
}

@Bean
public MyTransformer transformer() {
    return new MyTransformer();
}
}

Filter:

@Filter(inputChannel = "input", outputChannel = "myChannel")
public boolean accept(final Message<?> message) 

Transformer:

@Transformer(inputChannel = "myChannel", outputChannel = "output")
public OtherObject transform(final MyObject payload)

Exception:

2016-05-13T11:17:59+0200 1.3.1.RELEASE WARN xdbus.tt.0-1 listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'JavaConfiguredModule [name=myFilter, type=processor, group=tt, index=1 @7d48b140]:default,admin,singlenode,hsqldbServer:9393.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45) ~[spring-integration-amqp-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:93) ~[spring-integration-amqp-4.2.5.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    ... 10 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
    ... 33 common frames omitted

Answer:

As you have found, transformers must return something.

Please refer to the Spring Integration documentation (each XD processor module is a small Spring Integration application with an input and output channel - sources just have an output, sinks just have an input).

The way you have it now, you have two consumers on input - messages will be round-robin distributed to them.

You need to wire the two components into a message flow (via a third message channel)...

@Filter(inputChannel = "input", outputChannel = "transformerChannel")
public boolean accept(final Message<?> message) {
    final MyObject payload = (MyObject) message.getPayload();
    return payload.getName().equals("test");
}


@Transformer(inputChannel = "transformerChannel", outputChannel = "output")
public OtherObject transform(final MyObject data) {
    return convert(data);
}

Notice the channel configuration.

Question:

I am new to spring-xd and would like to understand is it possible to invoke an external rest api from springxd. My rest api expects a request header for GET requests and request header+message body for POST requests. How do I invoke the URL ?

In my understanding http-client is to be used, but I am not very sure how do I use it.

I havent written any stream for this case yet.


Answer:

You would use the url and httpMethod properties in the stream definition

http-client --url=http://... --httpMethod=GET

You will also have to set the mappedRequestHeaders to pass any custom headers that have been set up upstream.

--mappedRequestHeaders=HTTP_REQUEST_HEADERS,myHeader1,myHeader2

However, if you want to use uri variables...

url=http://somehost/foo/{bar}/{baz}

you will need a custom http-client module to add <uri-variable/> child elements to the outbound-gateway.

You will also need a custom module if you want to add headers to be mapped (via a <header-enricher/>).

Question:

There are lots of examples of how to write transformers etc in java but nothing about filters (except the script type filters, but I want to use a java method).

I'd like to create a custom java filter to filter the payload of a message from a source to a sink.

The examples of filters all refer to an expression.

(How) can I tell the context to execute a java method in a specified class as the expression?


Answer:

Well, what you need to implement the custom Processor Module. Just follow with Custom Transformer sample from Spring XD Guilde

The custom Selector for filter:

public class MySelector implements MessageSelector {

     boolean accept(Message<?> message) {
      ...
    }
}

Module ctx myfilter.xml:

<channel id="input"/>

<filter input-channel="input" output-channel="output">
    <beans:bean class="custom.MySelector" />
</filter>

<channel id="output"/>

Package your class to the jar and place everything to the dir ${xd.home}/modules/processors/myfilter with structure:

/myfilter
   /config
      myfilter.xml
   /lib
      myfilter.jar

Test it like this:

xd:> stream create --name filtertest --definition "http | myfilter | log"