Hot questions for Spring Integration: Java DSL

Top 10 Java Open Source / Spring / Spring Integration: Java DSL

Question:

I have implemented an IntegrationFlow where I want to do to following tasks:

  1. Poll for files from a directory
  2. Transform the file content to a string
  3. Send the string via WebFluxRequestExecutingMessageHandler to a REST-Endpoint and use an AdviceChain to handle success and error responses

Implementation

@Configuration
@Slf4j
public class JsonToRestIntegration {

    @Autowired
    private LoadBalancerExchangeFilterFunction lbFunction;

    @Value("${json_folder}")
    private String jsonPath;

    @Value("${json_success_folder}")
    private String jsonSuccessPath;

    @Value("${json_error_folder}")
    private String jsonErrorPath;

    @Value("${rest-service-url}")
    private String restServiceUrl;

    @Bean
    public DirectChannel httpResponseChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel successChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel failureChannel() {
        return new DirectChannel();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(1000).get();
    }

   @Bean
public IntegrationFlow jsonFileToRestFlow() {
    return IntegrationFlows
            .from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))
            .transform(org.springframework.integration.file.dsl.Files.toStringTransformer())
            .enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
            .handle(reactiveOutbound())
            .log()
            .channel(httpResponseChannel())
            .get();
}

    @Bean
    public FileReadingMessageSource fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(jsonPath));
        source.setFilter(new SimplePatternFileListFilter("*.json"));
        source.setUseWatchService(true);
        source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);

        return source;
    }

    @Bean
    public MessageHandler reactiveOutbound() {
        WebClient webClient = WebClient.builder()
                .baseUrl("http://jsonservice")
                .filter(lbFunction)
                .build();

        WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(restServiceUrl, webClient);

        handler.setHttpMethod(HttpMethod.POST);
        handler.setCharset(StandardCharsets.UTF_8.displayName());
        handler.setOutputChannel(httpResponseChannel());
        handler.setExpectedResponseType(String.class);
        handler.setAdviceChain(singletonList(expressionAdvice()));

        return handler;
    }

    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();

        advice.setTrapException(true);
        advice.setSuccessChannel(successChannel());
        advice.setOnSuccessExpressionString("payload + ' war erfolgreich'");
        advice.setFailureChannel(failureChannel());
        advice.setOnFailureExpressionString("payload + ' war nicht erfolgreich'");

        return advice;
    }

    @Bean
    public IntegrationFlow loggingFlow() {
        return IntegrationFlows.from(httpResponseChannel())
                .handle(message -> {
                    String originalFileName = (String) message.getHeaders().get(FileHeaders.FILENAME);
                    log.info("some log");
                })
                .get();
    }

    @Bean
    public IntegrationFlow successFlow() {
        return IntegrationFlows.from(successChannel())
                .handle(message -> {
                    MessageHeaders messageHeaders = ((AdviceMessage) message).getInputMessage().getHeaders();

                    File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
                    String originalFileName = (String) messageHeaders.get(FILENAME);

                    if (originalFile != null && originalFileName != null) {

                        File jsonSuccessFolder = new File(jsonSuccessPath);
                        File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);

                        try {
                            Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
                        } catch (IOException e) {
                            log.error("some log", e);
                        }
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return IntegrationFlows.from(failureChannel())
                .handle(message -> {
                    Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();

                    if (failedMessage != null) {

                        File originalFile = (File) failedMessage.getHeaders().get(FileHeaders.ORIGINAL_FILE);
                        String originalFileName = (String) failedMessage.getHeaders().get(FileHeaders.FILENAME);

                        if (originalFile != null && originalFileName != null) {

                            File jsonErrorFolder = new File(tonisJsonErrorPath);
                            File jsonErrorFile = new File(jsonErrorFolder, originalFileName);

                            try {
                                Files.move(originalFile.toPath(), jsonErrorFile.toPath());
                            } catch (IOException e) {
                                log.error("some log", e);
                            }
                        }
                    }
                })
                .get();
    }
}

So far it seems to work in production. In the test I want to do the following steps:

  1. Copy JSON-Files to the input directory
  2. Start the polling for the json files
  3. Do assertions on the HTTP-Response from the WebFluxRequestExecutingMessageHandler which are routed through my advice chain

But I'm struggling in the test with the following tasks:

  1. Mocking the WebFluxRequestExecutingMessageHandler with the MockIntegrationContext.substituteMessageHandlerFor()-method
  2. Manually start the polling of the json files

Test

@RunWith(SpringRunner.class)
@SpringIntegrationTest()
@Import({JsonToRestIntegration.class})
@JsonTest
public class JsonToRestIntegrationTest {

    @Autowired
    public DirectChannel httpResponseChannel;

    @Value("${json_folder}")
    private String jsonPath;

    @Value("${json_success_folder}")
    private String jsonSuccessPath;

    @Value("${json_error_folder}")
    private String jsonErrorPath;

    @Autowired
    private MockIntegrationContext mockIntegrationContext;

    @Autowired
    private MessageHandler reactiveOutbound;

    @Before
    public void setUp() throws Exception {
        Files.createDirectories(Paths.get(jsonPath));
        Files.createDirectories(Paths.get(jsonSuccessPath));
        Files.createDirectories(Paths.get(jsonErrorPath));
    }

    @After
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(jsonPath));
        FileUtils.deleteDirectory(new File(jsonSuccessPath));
        FileUtils.deleteDirectory(new File(jsonErrorPath));
    }

    @Test
    public void shouldSendJsonToRestEndpointAndReceiveOK() throws Exception {
        File jsonFile = new ClassPathResource("/test.json").getFile();
        Path targetFilePath = Paths.get(jsonPath + "/" + jsonFile.getName());
        Files.copy(jsonFile.toPath(), targetFilePath);

        httpResponseChannel.subscribe(httpResponseHandler());

        this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);
    }

    private MessageHandler httpResponseHandler() {
        return message -> Assert.assertThat(message.getPayload(), is(notNullValue()));
    }

    @Configuration
    @Import({JsonToRestIntegration.class})
    public static class JsonToRestIntegrationTest {

        @Autowired
        public MessageChannel httpResponseChannel;

        @Bean
        public MessageHandler reactiveOutbound() {
            ArgumentCaptor<Message<?>> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);

            MockMessageHandler mockMessageHandler = mockMessageHandler(messageArgumentCaptor).handleNextAndReply(m -> m);
            mockMessageHandler.setOutputChannel(httpResponseChannel);
            return mockMessageHandler;
        }

    }

}

Updated Working Example with mocked WebFluX web client:

Implementation

public class JsonToRestIntegration {

    private final LoadBalancerExchangeFilterFunction lbFunction;

    private final BatchConfigurationProperties batchConfigurationProperties;

    @Bean
    public DirectChannel httpResponseChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(100, TimeUnit.MILLISECONDS).get();
    }

    @Bean
    public IntegrationFlow jsonFileToRestFlow() {
        return IntegrationFlows
                .from(fileReadingMessageSource(),  e -> e.id("fileReadingEndpoint"))
                .transform(org.springframework.integration.file.dsl.Files.toStringTransformer("UTF-8"))
                .enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
                .handle(reactiveOutbound())
                .channel(httpResponseChannel())
                .get();
    }

    @Bean
    public FileReadingMessageSource fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(batchConfigurationProperties.getJsonImportFolder()));
        source.setFilter(new SimplePatternFileListFilter("*.json"));
        source.setUseWatchService(true);
        source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);

        return source;
    }

    @Bean
    public WebFluxRequestExecutingMessageHandler reactiveOutbound() {
        WebClient webClient = WebClient.builder()
                .baseUrl("http://service")
                .filter(lbFunction)
                .build();

        WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(batchConfigurationProperties.getServiceUrl(), webClient);

        handler.setHttpMethod(HttpMethod.POST);
        handler.setCharset(StandardCharsets.UTF_8.displayName());
        handler.setOutputChannel(httpResponseChannel());
        handler.setExpectedResponseType(String.class);
        handler.setAdviceChain(singletonList(expressionAdvice()));

        return handler;
    }

    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();

        advice.setTrapException(true);
        advice.setFailureChannel(errorChannel());

        return advice;
    }

    @Bean
    public IntegrationFlow responseFlow() {
        return IntegrationFlows.from(httpResponseChannel())
                .handle(message -> {
                    MessageHeaders messageHeaders = message.getHeaders();
                    File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
                    String originalFileName = (String) messageHeaders.get(FILENAME);                        

                    if (originalFile != null && originalFileName != null) {

                        File jsonSuccessFolder = new File(batchConfigurationProperties.getJsonSuccessFolder());
                        File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);

                        try {
                            Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
                        } catch (IOException e) {
                            log.error("Could not move file", e);
                        }
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return IntegrationFlows.from(errorChannel())
                .handle(message -> {
                    Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();

                    if (failedMessage != null) {

                        File originalFile = (File) failedMessage.getHeaders().get(ORIGINAL_FILE);
                        String originalFileName = (String) failedMessage.getHeaders().get(FILENAME);                            

                        if (originalFile != null && originalFileName != null) {

                            File jsonErrorFolder = new File(batchConfigurationProperties.getJsonErrorFolder());
                            File jsonErrorFile = new File(jsonErrorFolder, originalFileName);

                            try {
                                Files.move(originalFile.toPath(), jsonErrorFile.toPath());
                            } catch (IOException e) {
                                log.error("Could not move file", originalFileName, e);
                            }
                        }
                    }
                })
                .get();
    }
}

Test

@RunWith(SpringRunner.class)
@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")
@Import({JsonToRestIntegration.class, BatchConfigurationProperties.class})
@JsonTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class JsonToRestIntegrationIT {

    private static final FilenameFilter JSON_FILENAME_FILTER = (dir, name) -> name.endsWith(".json");

    @Autowired
    private BatchConfigurationProperties batchConfigurationProperties;

    @Autowired
    private ObjectMapper om;

    @Autowired
    private MessageHandler reactiveOutbound;

    @Autowired
    private DirectChannel httpResponseChannel;

    @Autowired
    private DirectChannel errorChannel;

    @Autowired
    private FileReadingMessageSource fileReadingMessageSource;

    @Autowired
    private SourcePollingChannelAdapter fileReadingEndpoint;

    @MockBean
    private LoadBalancerExchangeFilterFunction lbFunction;

    private String jsonImportPath;
    private String jsonSuccessPath;
    private String jsonErrorPath;

    @Before
    public void setUp() throws Exception {
        jsonImportPath = batchConfigurationProperties.getJsonImportFolder();
        jsonSuccessPath = batchConfigurationProperties.getJsonSuccessFolder();
        jsonErrorPath = batchConfigurationProperties.getJsonErrorFolder();

        Files.createDirectories(Paths.get(jsonImportPath));
        Files.createDirectories(Paths.get(jsonSuccessPath));
        Files.createDirectories(Paths.get(jsonErrorPath));
    }

    @After
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(jsonImportPath));
        FileUtils.deleteDirectory(new File(jsonSuccessPath));
        FileUtils.deleteDirectory(new File(jsonErrorPath));
    }

    @Test
    public void shouldMoveJsonFileToSuccessFolderWhenHttpResponseIsOk() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);

        httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                latch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                fail();
            }
        });

        ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
            response.setStatusCode(HttpStatus.OK);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);

            DataBufferFactory bufferFactory = response.bufferFactory();

            String valueAsString = null;
            try {
                valueAsString = om.writeValueAsString(new ResponseDto("1"));
            } catch (JsonProcessingException e) {
                fail();
            }
            return response.writeWith(Mono.just(bufferFactory.wrap(valueAsString.getBytes())))
                    .then(Mono.defer(response::setComplete));

        });

        WebClient webClient = WebClient.builder()
                .clientConnector(httpConnector)
                .build();

        new DirectFieldAccessor(this.reactiveOutbound)
                .setPropertyValue("webClient", webClient);

        File jsonFile = new ClassPathResource("/test.json").getFile();
        Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
        Files.copy(jsonFile.toPath(), targetFilePath);

        fileReadingEndpoint.start();

        assertThat(latch.await(12, TimeUnit.SECONDS), is(true));

        File[] jsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonImportFolder, is(notNullValue()));
        assertThat(filesInJsonImportFolder.length, is(0));

        File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonSuccessFolder, is(notNullValue()));
        assertThat(filesInJsonSuccessFolder.length, is(1));

        File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonErrorFolder, is(notNullValue()));
        assertThat(filesInJsonErrorFolder.length, is(0));
    }

    @Test
    public void shouldMoveJsonFileToErrorFolderWhenHttpResponseIsNotOk() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);

        errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                latch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                fail();
            }
        });

        ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
            response.setStatusCode(HttpStatus.BAD_REQUEST);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);

            DataBufferFactory bufferFactory = response.bufferFactory();

            return response.writeWith(Mono.just(bufferFactory.wrap("SOME BAD REQUEST".getBytes())))
                    .then(Mono.defer(response::setComplete));

        });

        WebClient webClient = WebClient.builder()
                .clientConnector(httpConnector)
                .build();

        new DirectFieldAccessor(this.reactiveOutbound)
                .setPropertyValue("webClient", webClient);

        File jsonFile = new ClassPathResource("/error.json").getFile();
        Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
        Files.copy(jsonFile.toPath(), targetFilePath);

        fileReadingEndpoint.start();

        assertThat(latch.await(11, TimeUnit.SECONDS), is(true));

        File[] filesInJsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonImportFolder, is(notNullValue()));
        assertThat(filesInJsonImportFolder.length, is(0));

        File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonSuccessFolder, is(notNullValue()));
        assertThat(filesInJsonSuccessFolder.length, is(0));

        File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonErrorFolder, is(notNullValue()));
        assertThat(filesInJsonErrorFolder.length, is(1));
    }
}

Answer:

  this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);

The first parameter of this method is an endpoint id. (I guess we are just missing Javadocs there on those methods...).

So, what you need is something like this:

.handle(reactiveOutbound(), e -> e.id("webFluxEndpoint"))

And then in that test-case you do:

 this.mockIntegrationContext.substituteMessageHandlerFor("webFluxEndpoint", reactiveOutbound);

You don't need to override bean in the test class config. The MockMessageHandler can be just used in the test method body.

You poll files via .from(fileReadingMessageSource()). To do a manual control you need to have it stopped in the beginning. For this purpose you add an endpoint id again:

.from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))

And then in the test configuration you do this:

@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")

Another approach for the WebFlux would be via customized WebClient to mock request to the server. For example:

ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
        response.setStatusCode(HttpStatus.OK);
        response.getHeaders().setContentType(MediaType.TEXT_PLAIN);

        DataBufferFactory bufferFactory = response.bufferFactory();
        return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes())))
                .then(Mono.defer(response::setComplete));
    });

    WebClient webClient = WebClient.builder()
            .clientConnector(httpConnector)
            .build();

    new DirectFieldAccessor(this.reactiveOutbound)
            .setPropertyValue("webClient", webClient);

Question:

I am not sure why I am getting the exception

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

Its just a simple IntegrationFlow but not sure what am I missing here in the code below.

  @Bean
  Exchange messageExchange() {
    return ExchangeBuilder
        .directExchange("attr")
        .durable(true)
        .build();
  }

  @Bean
  Queue queue() {
    return QueueBuilder
        .durable("attr_queue")
        .build();
  }

  @Bean
  Binding binding() {
    return BindingBuilder
        .bind(queue())
        .to(messageExchange())
        .with("attr_queue")
        .noargs();
  }

  @Bean
  IntegrationFlow deltaFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp
        .inboundAdapter(connectionFactory, queue()))
        .handle(String.class, (payload, headers) -> {
          if (payload.isEmpty()) {
            log.info("Payload empty");
          } else {
            log.info("Payload : " + payload);
          }
          return payload;
        })
        .get();
  }

I was trying to get my hands on Spring Integration and was not sure why I am getting this exception. All I'm trying to do is to read from a queue using an inboundAdapter and just log it to the console. The code runs fine, but when I publish a message to the queue, I get this exception. Do I have to specify a replyChannel or output-channel always when using Amqp adapters?


Answer:

No, that’s not AMQP Channel Adapter problem. Please, look at your handle() - you return something there. And there is nothing afterwards to handle that return. So, where should a reply go? Right, into the replyChannel header. But wait , there is no one because there is nothing to wait for the reply - the Channel Adapter is one-way component.

Since you do nothing with the reply and the Framework can’t make an assumption from the configuration phase that you are not going to handle this reply, we just have that exception at runtime. It can’t make that assumption because there is a message channel before that handle(), so you may send a message with replyChannel header from some other flow and so on. But! Since this is your code and you fully control it you may have an assumption that nobody is going to expect a reply from there and it would be better to stop streaming from this point. For this purpose it would be better to use one-way MessageHandler - based handle() variant or just return null instead of payload. You also may use channel(“nullChannel”) to stop streaming.

Question:

I have a running app that defines some spring integration flows with inbound/outbound gateways, splitters, aggregators,routers and etc...

These flows are all created using spring integration dsl and annotations... so no XML's.

There is any tool out there that can generate the EE patterns diagrams for it?

This question states that Intellij can do it for xml configurations... I want something similar that works with dsl IntegrationFlow's


Answer:

There is Spring Flo project and based on its foundation we have expose the IntegrationGraphController with the Graph tree to represent integration flows as JSON. That model can be used to visualize realtime of your application.

In addition we have a sample application with the mentioned functionality.

EDIT

The spring-flo project is in the process of being migrated to angular 4/5.

To build and run the viewer:

git checkout angular-1.x
cd samples/spring-flo-si
mvn clean package
java -jar target/spring-flo-sample-si-0.0.1.BUILD-SNAPSHOT.jar

In a browser go to http://localhost:8082 and enter the URL for your app that has the integration graph enabled; click Load.

Enabling the graph endpoint is documented here.

Question:

I am trying to create a TCP server and client by reading the property files which contains the detail of the connections. I am using Dynamic and runtime Integration Flows with the help of following reference document ( 9.20 Dynamic and runtime Integration Flows)

The code is working fine while creating the client but when I am creating the server using the same with changes in the code as follow:

    IntegrationFlow flow = f -> f
            .handle(Tcp.inboundAdapter(Tcp.netServer(2221)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("server")))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();

I am getting the following error:

Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.String] for method match: [public java.lang.Class<?> org.springframework.integration.dsl.IntegrationComponentSpec.getObjectType(), public S org.springframework.integration.dsl.MessageProducerSpec.outputChannel(java.lang.String), public S org.springframework.integration.dsl.MessageProducerSpec.outputChannel(org.springframework.messaging.MessageChannel), public org.springframework.integration.ip.dsl.TcpInboundChannelAdapterSpec org.springframework.integration.ip.dsl.TcpInboundChannelAdapterSpec.taskScheduler(org.springframework.scheduling.TaskScheduler), public S org.springframework.integration.dsl.MessageProducerSpec.errorMessageStrategy(org.springframework.integration.support.ErrorMessageStrategy), public S org.springframework.integration.dsl.MessageProducerSpec.phase(int), public S org.springframework.integration.dsl.MessageProducerSpec.autoStartup(boolean), public S org.springframework.integration.dsl.MessageProducerSpec.sendTimeout(long)]
    at org.springframework.util.Assert.isNull(Assert.java:155)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:843)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:362)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:231)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:225)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:60)
    at org.springframework.integration.handler.ServiceActivatingHandler.<init>(ServiceActivatingHandler.java:38)
    at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:924)
    at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:904)
    at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:891)
    at org.springframework.integration.samples.dynamictcp.DynamicTcpClientApplication.lambda$1(DynamicTcpClientApplication.java:194)
    at org.springframework.integration.config.dsl.IntegrationFlowBeanPostProcessor.processIntegrationFlowImpl(IntegrationFlowBeanPostProcessor.java:268)
    at org.springframework.integration.config.dsl.IntegrationFlowBeanPostProcessor.postProcessBeforeInitialization(IntegrationFlowBeanPostProcessor.java:96)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:423)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1702)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:583)
    ... 16 common frames omitted

Please help me with above issue. Also I have found the code for dynamic tcp client but no code is present for dynamic tcp server(any resource or link where I can take an idea to create dynamic server).


Answer:

You are mixing responsibility. The Tcp.inboundAdapter() must be a first in the IntegrationFlow chain. Consider to use this instead:

IntegrationFlow flow =  
   IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(2221)
                .serializer(TcpCodecs.crlf())
                .deserializer(TcpCodecs.lengthHeader1())
                .id("server")))
        .transform(Transformers.objectToString())
        .get();

Question:

I have a Flow which takes string input

  @Bean
  public IntegrationFlow myFlow() {
        // @formatter:off
        return IntegrationFlows.from("some.input.channel")
                               .handle(someService)
                               .get();

How do i invoke this from my integration test, how to put a string message on "some.input.channel"


Answer:

Read Javadocs of the API you use:

/**
 * Populate the {@link MessageChannel} name to the new {@link IntegrationFlowBuilder} chain.
 * The {@link org.springframework.integration.dsl.IntegrationFlow} {@code inputChannel}.
 * @param messageChannelName the name of existing {@link MessageChannel} bean.
 * The new {@link DirectChannel} bean will be created on context startup
 * if there is no bean with this name.
 * @return new {@link IntegrationFlowBuilder}.
 */
public static IntegrationFlowBuilder from(String messageChannelName) {

Then open a Reference Manual of the Framework you use:

https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-channels-section.html#messaging-channels-section

https://docs.spring.io/spring/docs/current/spring-framework-reference/testing.html#integration-testing-annotations-standard

So, the channel created by the Java DSL becomes a bean in the application context. There is just enough to autowire it into the test class and call its send() from the test method:

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = MyFlowConfiguration.class)
public class IntegrationFlowTests {

    @Autowired
    @Qualifier("some.input.channel")
    private MessageChannel someInputChannel;

    @Test
    public void myTest() {
          this.someInputChannel.send(new GenericMessage<>("foo"));
    }
}

Question:

I couldn't find a function to add headers to outboundGateway in spring integration dsl.

.handle(outboundGateway("localhost:8080/search")
       .httpMethod(HttpMethod.GET)
       .expectedResponseType(Order.class))

The headers that i would like to add to request are

HttpHeaders headers = new HttpHeaders();
headers.setAccept(newArrayList(APPLICATION_JSON));
headers.setContentType(APPLICATION_JSON);
headers.add("Client-Id", "test");

Can someone help me here


Answer:

That's correct: Spring Integration doesn't allow to manipulate HttpHeaders object directly. Instead you should follow the canonical messaging approach - protocol free .enrichHeaders():

.enrichHeaders(e -> e
                        .header(DefaultHttpHeaderMapper.ACCEPT, APPLICATION_JSON)
                        .header(DefaultHttpHeaderMapper.CONTENT_TYPE, APPLICATION_JSON)
                        .header("Client-Id", "test"))
.handle(outboundGateway("localhost:8080/search")
   .httpMethod(HttpMethod.GET)
   .expectedResponseType(Order.class))

Question:

I don't know if this question is about spring-integration, spring-integration-dsl or both, so I just added the 2 tags...

I spend a considerable amount of time today, first doing a simple flow with a filter

StandardIntegrationFlow flow = IntegrationFlows.from(...)
                    .filter(messagingFilter)
                    .transform(transformer)
                    .handle((m) -> {
                        (...)
                    })
                    .get();

The messagingFilter being a very simple implementation of a MessageSelector. So far so good, no much time spent. But then I wanted to log a message in case the MessageSelector returned false, and here is where I got stuck.

After quite some time I ended up with this:

StandardIntegrationFlow flow = IntegrationFlows.from(...)
                    .filter(messagingFilters, fs -> fs.discardFlow( i -> i.channel(discardChannel()))
                    .transform(transformer)
                    .handle((m) -> {
                        (...)
                    })
                    .get();

(...)

public MessageChannel discardChannel() {
    MessageChannel channel = new MessageChannel(){
        @Override
        public boolean send(Message<?> message) {
            log.warn((String) message.getPayload().get("msg-failure"));
            return true;
        }
        @Override
        public boolean send(Message<?> message, long timeout) {
            return this.send(message);
        }
    };
    return channel;
}

This is both ugly and verbose, so the question is, what have I done wrong here and how should I have done it in a better, cleaner, more elegant solution?

Cheers.


Answer:

Your problem that you don't see that Filter is a EI Pattern implementation and the maximum it can do is to send discarded message to some channel. It isn't going to log anything because that approach won't be Messaging-based already.

The simplest way you need for your use-case is like:

.discardFlow(df -> df
        .handle(message -> log.warn((String) message.getPayload().get("msg-failure")))))

That your logic to just log. Some other people might do more complicated logic. So, eventually you'll get to used to with channel abstraction between endpoints.

I agree that new MessageChannel() {} approach is wrong. The logging indeed should be done in the MessageHandler instead. That is the level of the service responsibility. Also don't forget that there is LoggingHandler, which via Java DSL can be achieved as:

 .filter(messagingFilters, fs -> fs.discardFlow( i -> i.log(message -> (String) message.getPayload().get("msg-failure"))))

Question:

I am trying to call a soap webservice using spring integration's MarshallingWebServiceOutboundGateway. The following is my flow which is quite simple:

@Bean
public IntegrationFlow asghar() throws Exception {
    Map<String, Object> action = new HashMap<>();
    action.put("ws_soapAction", "getCardTypes");

    return IntegrationFlows.from("inputChannel")
            .enrichHeaders(action)
            .handle(asgharWebserviceGateway()).get();
} 

The object in the message payload that comes through "inputChannel" is of the Type CardGroup. Then I create the gateway as follows:

    @Bean
    public MarshallingWebServiceOutboundGateway asgharWebserviceGateway() throws Exception {
        SaajSoapMessageFactory messageFactory = new SaajSoapMessageFactory(
                MessageFactory.newInstance(SOAPConstants.SOAP_1_1_PROTOCOL));
        messageFactory.afterPropertiesSet();

        WebServiceTemplate webServiceTemplate = new WebServiceTemplate(messageFactory);

        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();

        marshaller.setContextPath("my.package.to.webservice.entities");
        marshaller.setCheckForXmlRootElement(false);
        marshaller.afterPropertiesSet();

        webServiceTemplate.setMarshaller(marshaller);
        webServiceTemplate.afterPropertiesSet();
        MarshallingWebServiceOutboundGateway asghar = new MarshallingWebServiceOutboundGateway("http://uri.to.webservice/MobileService", webServiceTemplate);

        asghar.setReplyChannel(replyChannel());
        return asghar;
    }

this is a part of the service interface generated by cxf from wsdl

@WebMethod
@WebResult(name = "return", targetNamespace = "http://ws.gateway.manager.br.com/", partName = "return")
public CardTypesResponse getCardTypes(

        @WebParam(partName = "cardGroup", name = "cardGroup")
                CardGroup cardGroup
);

and this is the wsdl for the same part:

  <wsdl:message name="getCardTypes">
    <wsdl:part name="cardGroup" type="tns:cardGroup">
    </wsdl:part>
  </wsdl:message>

  <wsdl:message name="getCardTypesResponse">
    <wsdl:part name="return" type="tns:cardTypesResponse">
    </wsdl:part>
  </wsdl:message>

  <wsdl:operation name="getCardTypes">
    <wsdl:input message="tns:getCardTypes" name="getCardTypes">
    </wsdl:input>
    <wsdl:output message="tns:getCardTypesResponse" name="getCardTypesResponse">
    </wsdl:output>
  </wsdl:operation>
  <wsdl:operation name="getCardTypes">
    <soap:operation soapAction="" style="rpc"/>
      <wsdl:input name="getCardTypes">
        <soap:body namespace="http://ws.gateway.manager.br.com/" use="literal"/>
      </wsdl:input>
      <wsdl:output name="getCardTypesResponse">
        <soap:body namespace="http://ws.gateway.manager.br.com/" use="literal"/>
      </wsdl:output>
  </wsdl:operation>

As you can see there is no soapAction in wsdl and the above-mentioned code produces this soap message:

<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">
    <SOAP-ENV:Header>
    </SOAP-ENV:Header>
    <SOAP-ENV:Body>
        <tns:cardGroup xmlns:tns="http://ws.gateway.manager.br.com/">
            <code>9865421</code>
            <title>654965587</title>
        </tns:cardGroup>
    </SOAP-ENV:Body>
</SOAP-ENV:Envelope>

Now I want to know how I can use the operation name (getCardTypes) and where should I set it so that the soap message is created correctly which must be this:

<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">

    <SOAP-ENV:Header>

    </SOAP-ENV:Header>
    <SOAP-ENV:Body>
        <tns:getCardTypes xmlns:tns="http://ws.gateway.manager.br.com/">
             <tns:cardGroup xmlns:tns="http://ws.gateway.manager.br.com/">
                <code>9865421</code>
                <title>654965587</title>
            </tns:cardGroup>
        </tns:getCardTypes>
    </SOAP-ENV:Body>
</SOAP-ENV:Envelope>

Answer:

<wsdl:message name="getCardTypes">
    <wsdl:part name="cardGroup" type="tns:cardGroup">
    </wsdl:part>
</wsdl:message>

So, your CardGroup object must be wrapped into a GetCardTypes object. Spring WS is not a CXF, so you need to get used to its Contract First approach.

Question:

I have setup File poller with task executor

ExecutorService executorService = Executors.newFixedThreadPool(10);

            LOG.info("Setting up the poller for directory {} ", finalDirectory);
            StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
                    c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
                            .taskExecutor(executorService)
                            .maxMessagesPerPoll(10)
                            .advice(new LoggerSourceAdvisor(finalDirectory))
                    ))


                    //move file to processing first processing                    
                    .transform(new FileMoveTransformer("C:/processing", true))
                    .channel("fileRouter")
                    .get();

As seen I have setup fixed threadpool of 10 and maximum message 10 per poll. If I put 10 files it still processes one by one. What could be wrong here ?

* UPDATE *

It works perfectly fine after Gary's answer though I have other issue now.

I have setup my Poller like this

setDirectory(new File(path));
        DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();

        scanner.setFilter(new AcceptAllFileListFilter<>());
        setScanner(scanner);

The reason of using AcceptAll because the same file may come again that's why I sort of move the file first. But when I enable the thread executor the same file is being processed by mutliple threads, I assume because of AcceptAllFile

If I Change to AcceptOnceFileListFilter it works but then the same file that comes again will not be picked up again ! What can be done to avoid this issue ?

Issue/Bug

In Class AbstractPersistentAcceptOnceFileListFilter We have this code

@Override
    public boolean accept(F file) {
        String key = buildKey(file);
        synchronized (this.monitor) {
            String newValue = value(file);
            String oldValue = this.store.putIfAbsent(key, newValue);
            if (oldValue == null) { // not in store
                flushIfNeeded();
                return true;
            }
            // same value in store
            if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
                flushIfNeeded();
                return true;
            }
            return false;
        }
    }

Now for example if I have setup max per poll 5 and there are two files then its possible same file would be picked up by two threads.

Lets say my code moves the files once I read it.

But the other thread gets to the accept method

if the file is not there then it will return lastModified time as 0 and it will return true.

That causes the issue because the file is NOT there.

If its 0 then it should return false as the file is not there anymore.


Answer:

When you add a task executor to a poller; all that does is the scheduler thread hands the poll task off to a thread in the thread pool; the maxMessagesPerPoll is part of the poll task. The poller itself only runs once every 5 seconds. To get what you want, you should add an executor channel to the flow...

@SpringBootApplication
public class So53521593Application {

    private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So53521593Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        return IntegrationFlows.from(() -> "foo", e -> e
                    .poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
                .channel(MessageChannels.executor(exec))
                .<String>handle((p, h) -> {
                    try {
                        logger.info(p);
                        Thread.sleep(10_000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                    return null;
                })
                .get();
    }
}

EDIT

It works fine for me...

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .handle((p, h) -> {
                try {
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

and

2018-11-28 11:46:05.196 INFO 57607 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 11:46:05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

and with touch test1.txt

2018-11-28 11:48:00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

EDIT1

Agreed - reproduced with this...

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .<File>handle((p, h) -> {
                try {
                    p.delete();
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

and

2018-11-28 13:22:23.689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Application : /tmp/foo/test2.txt