Hot questions for Spring Integration: SFTP Adapters

Top 10 Java Open Source / Spring / Spring Integration: SFTP Adapters

Question:

I would have expected the ftp synchronization mechanism to update a changed file. However, from what I can see here, the file is only downloaded if it does not exist yet. As for now, the file is not saved locally even though the timestamp/ content has changed.

So here is what I've discovered so far:

The class org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer

@Override
    public void synchronizeToLocalDirectory(final File localDirectory) {
        final String remoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext, String.class);
        try {
            int transferred = this.remoteFileTemplate.execute(new SessionCallback<F, Integer>() {

                @Override
                public Integer doInSession(Session<F> session) throws IOException {
                    F[] files = session.list(remoteDirectory);
                    if (!ObjectUtils.isEmpty(files)) {
                        List<F> filteredFiles = filterFiles(files);
                        for (F file : filteredFiles) {
                            try {
                                if (file != null) {
                                    copyFileToLocalDirectory(
                                            remoteDirectory, file, localDirectory,
                                            session);
                                }
                            }
                            catch (RuntimeException e) {
                                if (AbstractInboundFileSynchronizer.this.filter instanceof ReversibleFileListFilter) {
                                    ((ReversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter)
                                            .rollback(file, filteredFiles);
                                }
                                throw e;
                            }
                            catch (IOException e) {
                                if (AbstractInboundFileSynchronizer.this.filter instanceof ReversibleFileListFilter) {
                                    ((ReversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter)
                                            .rollback(file, filteredFiles);
                                }
                                throw e;
                            }
                        }
                        return filteredFiles.size();
                    }
                    else {
                        return 0;
                    }
                }
            });
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(transferred + " files transferred");
            }
        }
        catch (Exception e) {
            throw new MessagingException("Problem occurred while synchronizing remote to local directory", e);
        }
    }

filters the files to be downloaded. I'd like to use the org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter, which compares filenames and last modified dates.

It then invokes the copyFileToLocalDirectory function with the filtered files (to be copied).

protected void copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, File localDirectory,
            Session<F> session) throws IOException {
        String remoteFileName = this.getFilename(remoteFile);
        String localFileName = this.generateLocalFileName(remoteFileName);
        String remoteFilePath = remoteDirectoryPath != null
                ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
                : remoteFileName;
        if (!this.isFile(remoteFile)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("cannot copy, not a file: " + remoteFilePath);
            }
            return;
        }

        File localFile = new File(localDirectory, localFileName);
        if (!localFile.exists()) {
            String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix;
            File tempFile = new File(tempFileName);
            OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
            try {
                session.read(remoteFilePath, outputStream);
            }
            catch (Exception e) {
                if (e instanceof RuntimeException) {
                    throw (RuntimeException) e;
                }
                else {
                    throw new MessagingException("Failure occurred while copying from remote to local directory", e);
                }
            }
            finally {
                try {
                    outputStream.close();
                }
                catch (Exception ignored2) {
                }
            }

            if (tempFile.renameTo(localFile)) {
                if (this.deleteRemoteFiles) {
                    session.remove(remoteFilePath);
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("deleted " + remoteFilePath);
                    }
                }
            }
            if (this.preserveTimestamp) {
                localFile.setLastModified(getModified(remoteFile));
            }
        }
    }

However, this method checks (only based on the file name) if the file is already present on the local disk and only downloads if it is not present. So basically there is no chance the updated file with (with the new timestamp) gets downloaded.

I played around trying to change the FtpInboundFileSynchronizer but it gets way too complicated. What is the best way to "customize" the synchronize- / copyToLocalDirectory methods?


Answer:

It is possible to update AbstractInboundFileSynchronizer to recognize updated files, but it is brittle and you run into other issues.

Update 13/Nov/2016: Found out how to get modification timestamps in seconds.

The main problem with updating the AbstractInboundFileSynchronizer is that it has setter-methods but no (protected) getter-methods. If, in the future, the setter-methods do something smart, the updated version presented here will break.

The main issue with updating files in the local directory is concurrency: if you are processing a local file at the same time that an update is being received, you can run into all sorts of trouble. The easy way out is to move the local file to a (temporary) processing directory so that an update can be received as a new file which in turn removes the need to update AbstractInboundFileSynchronizer. See also Camel's timestamp remarks.

By default FTP servers provide modification timestamps in minutes. For testing I updated the FTP client to use the MLSD command which provides modification timestamps in seconds (and milliseconds if you are lucky), but not all FTP servers support this.

As mentioned on the Spring FTP reference the local file filter needs to be a FileSystemPersistentAcceptOnceFileListFilter to ensure local files are picked up when the modification timestamp changes.

Below my version of the updated AbstractInboundFileSynchronizer, followed by some test classes I used.

public class FtpUpdatingFileSynchronizer extends FtpInboundFileSynchronizer {

    protected final Log logger = LogFactory.getLog(this.getClass());

    private volatile Expression localFilenameGeneratorExpression;
    private volatile EvaluationContext evaluationContext;
    private volatile boolean deleteRemoteFiles;
    private volatile String remoteFileSeparator = "/";
    private volatile boolean  preserveTimestamp;

    public FtpUpdatingFileSynchronizer(SessionFactory<FTPFile> sessionFactory) {
        super(sessionFactory);
        setPreserveTimestamp(true);
    }

    @Override
    public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) {
        super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression);
        this.localFilenameGeneratorExpression = localFilenameGeneratorExpression;
    }

    @Override
    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        super.setIntegrationEvaluationContext(evaluationContext);
        this.evaluationContext = evaluationContext;
    }

    @Override
    public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
        super.setDeleteRemoteFiles(deleteRemoteFiles);
        this.deleteRemoteFiles = deleteRemoteFiles;
    }

    @Override
    public void setRemoteFileSeparator(String remoteFileSeparator) {
        super.setRemoteFileSeparator(remoteFileSeparator);
        this.remoteFileSeparator = remoteFileSeparator;
    }

    @Override
    public void setPreserveTimestamp(boolean preserveTimestamp) {
        // updated
        Assert.isTrue(preserveTimestamp, "for updating timestamps must be preserved");
        super.setPreserveTimestamp(preserveTimestamp);
        this.preserveTimestamp = preserveTimestamp;
    }

    @Override
    protected void copyFileToLocalDirectory(String remoteDirectoryPath, FTPFile remoteFile, File localDirectory,
            Session<FTPFile> session) throws IOException {

        String remoteFileName = this.getFilename(remoteFile);
        String localFileName = this.generateLocalFileName(remoteFileName);
        String remoteFilePath = (remoteDirectoryPath != null
                ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
                        : remoteFileName);

        if (!this.isFile(remoteFile)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("cannot copy, not a file: " + remoteFilePath);
            }
            return;
        }

        // start update
        File localFile = new File(localDirectory, localFileName);
        boolean update = false;
        if (localFile.exists()) {
            if (this.getModified(remoteFile) > localFile.lastModified()) {
                this.logger.info("Updating local file " + localFile);
                update = true;
            } else {
                this.logger.info("File already exists: " + localFile);
                return;
            }
        }
        // end update

        String tempFileName = localFile.getAbsolutePath() + this.getTemporaryFileSuffix();
        File tempFile = new File(tempFileName);
        OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
        try {
            session.read(remoteFilePath, outputStream);
        } catch (Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException) e;
            }
            else {
                throw new MessagingException("Failure occurred while copying from remote to local directory", e);
            }
        } finally {
            try {
                outputStream.close();
            }
            catch (Exception ignored2) {
            }
        }
        // updated
        if (update && !localFile.delete()) {
            throw new MessagingException("Unable to delete local file [" + localFile + "] for update.");
        }
        if (tempFile.renameTo(localFile)) {
            if (this.deleteRemoteFiles) {
                session.remove(remoteFilePath);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("deleted " + remoteFilePath);
                }
            }
            // updated
            this.logger.info("Stored file locally: " + localFile);
        } else {
            // updated
            throw new MessagingException("Unable to rename temporary file [" + tempFile + "] to [" + localFile + "]");
        }
        if (this.preserveTimestamp) {
            localFile.setLastModified(getModified(remoteFile));
        }
    }

    private String generateLocalFileName(String remoteFileName) {

        if (this.localFilenameGeneratorExpression != null) {
            return this.localFilenameGeneratorExpression.getValue(this.evaluationContext, remoteFileName, String.class);
        }
        return remoteFileName;
    }

}

Following some of the test classes I used. I used dependencies org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE and org.apache.ftpserver:ftpserver-core:1.0.6 (plus the usual logging and testing dependencies).

public class TestFtpSync {

    static final Logger log = LoggerFactory.getLogger(TestFtpSync.class);
    static final String FTP_ROOT_DIR = "target" + File.separator + "ftproot";
    // org.apache.ftpserver:ftpserver-core:1.0.6
    static FtpServer server;

    @BeforeClass
    public static void startServer() throws FtpException {

        File ftpRoot = new File (FTP_ROOT_DIR);
        ftpRoot.mkdirs();
        TestUserManager userManager = new TestUserManager(ftpRoot.getAbsolutePath());
        FtpServerFactory serverFactory = new FtpServerFactory();
        serverFactory.setUserManager(userManager);
        ListenerFactory factory = new ListenerFactory();
        factory.setPort(4444);
        serverFactory.addListener("default", factory.createListener());
        server = serverFactory.createServer();
        server.start();
    }

    @AfterClass
    public static void stopServer() {

        if (server != null) {
            server.stop();
        }
    }

    File ftpFile = Paths.get(FTP_ROOT_DIR, "test1.txt").toFile();
    File ftpFile2 = Paths.get(FTP_ROOT_DIR, "test2.txt").toFile();

    @Test
    public void syncDir() {

        // org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
        try {
            ctx.register(FtpSyncConf.class);
            ctx.refresh();
            PollableChannel msgChannel = ctx.getBean("inputChannel", PollableChannel.class);
            for (int j = 0; j < 2; j++) {
                for (int i = 0; i < 2; i++) {
                    storeFtpFile();
                }
                for (int i = 0; i < 4; i++) {
                    fetchMessage(msgChannel);
                }
            }
        } catch (Exception e) {
            throw new AssertionError("FTP test failed.", e);
        } finally {
            ctx.close();
            cleanup();
        }
    }

    boolean tswitch = true;

    void storeFtpFile() throws IOException, InterruptedException {

        File f = (tswitch ? ftpFile : ftpFile2);
        tswitch = !tswitch;
        log.info("Writing message " + f.getName());
        Files.write(f.toPath(), ("Hello " + System.currentTimeMillis()).getBytes());
    }

    Message<?> fetchMessage(PollableChannel msgChannel) {

        log.info("Fetching message.");
        Message<?> msg = msgChannel.receive(1000L);
        if (msg == null) {
            log.info("No message.");
        } else {
            log.info("Have a message: " + msg);
        }
        return msg;
    }

    void cleanup() {

        delFile(ftpFile);
        delFile(ftpFile2);
        File d = new File(FtpSyncConf.LOCAL_DIR);
        if (d.isDirectory()) {
            for (File f : d.listFiles()) {
                delFile(f);
            }
        }
        log.info("Finished cleanup");
    }

    void delFile(File f) {

        if (f.isFile()) {
            if (f.delete()) {
                log.info("Deleted " + f);
            } else {
                log.error("Cannot delete file " + f);
            }
        }
    }

}

public class MlistFtpSessionFactory extends AbstractFtpSessionFactory<MlistFtpClient> {

    @Override
    protected MlistFtpClient createClientInstance() {
        return new MlistFtpClient();
    }

}

public class MlistFtpClient extends FTPClient {

    @Override
    public FTPFile[] listFiles(String pathname) throws IOException {
        return super.mlistDir(pathname);
    }
}

@EnableIntegration
@Configuration
public class FtpSyncConf {

    private static final Logger log = LoggerFactory.getLogger(FtpSyncConf.class);

    public static final String LOCAL_DIR = "/tmp/received";

    @Bean(name = "ftpMetaData")
    public ConcurrentMetadataStore ftpMetaData() {
        return new SimpleMetadataStore();
    }

    @Bean(name = "localMetaData")
    public ConcurrentMetadataStore localMetaData() {
        return new SimpleMetadataStore();
    }

    @Bean(name = "ftpFileSyncer")
    public FtpUpdatingFileSynchronizer ftpFileSyncer(
            @Qualifier("ftpMetaData") ConcurrentMetadataStore metadataStore) {

        MlistFtpSessionFactory ftpSessionFactory = new MlistFtpSessionFactory();
        ftpSessionFactory.setHost("localhost");
        ftpSessionFactory.setPort(4444);
        ftpSessionFactory.setUsername("demo");
        ftpSessionFactory.setPassword("demo");

        FtpPersistentAcceptOnceFileListFilter fileFilter = new FtpPersistentAcceptOnceFileListFilter(metadataStore, "ftp");
        fileFilter.setFlushOnUpdate(true);
        FtpUpdatingFileSynchronizer ftpFileSync = new FtpUpdatingFileSynchronizer(ftpSessionFactory);
        ftpFileSync.setFilter(fileFilter);
        // ftpFileSync.setDeleteRemoteFiles(true);
        return ftpFileSync;
    }

    @Bean(name = "syncFtp")
    @InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "500", maxMessagesPerPoll = "1"))
    public MessageSource<File> syncChannel(
            @Qualifier("localMetaData") ConcurrentMetadataStore metadataStore,
            @Qualifier("ftpFileSyncer") FtpUpdatingFileSynchronizer ftpFileSync) throws Exception {

        FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(ftpFileSync);
        File receiveDir = new File(LOCAL_DIR);
        receiveDir.mkdirs();
        messageSource.setLocalDirectory(receiveDir);
        messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore, "local"));
        log.info("Message source bean created.");
        return messageSource;
    }

    @Bean(name = "inputChannel")
    public PollableChannel inputChannel() {

        QueueChannel channel = new QueueChannel();
        log.info("Message channel bean created.");
        return channel;
    }

}

/**
 * Copied from https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp/src/test/java/org/springframework/integration/samples/ftp/support
 * @author Gunnar Hillert
 *
 */
public class TestUserManager extends AbstractUserManager {
    private BaseUser testUser;
    private BaseUser anonUser;

    private static final String TEST_USERNAME = "demo";
    private static final String TEST_PASSWORD = "demo";

    public TestUserManager(String homeDirectory) {
        super("admin", new ClearTextPasswordEncryptor());

        testUser = new BaseUser();
        testUser.setAuthorities(Arrays.asList(new Authority[] {new ConcurrentLoginPermission(1, 1), new WritePermission()}));
        testUser.setEnabled(true);
        testUser.setHomeDirectory(homeDirectory);
        testUser.setMaxIdleTime(10000);
        testUser.setName(TEST_USERNAME);
        testUser.setPassword(TEST_PASSWORD);

        anonUser = new BaseUser(testUser);
        anonUser.setName("anonymous");
    }

    public User getUserByName(String username) throws FtpException {
        if(TEST_USERNAME.equals(username)) {
            return testUser;
        } else if(anonUser.getName().equals(username)) {
            return anonUser;
        }

        return null;
    }

    public String[] getAllUserNames() throws FtpException {
        return new String[] {TEST_USERNAME, anonUser.getName()};
    }

    public void delete(String username) throws FtpException {
        throw new UnsupportedOperationException("Deleting of FTP Users is not supported.");
    }

    public void save(User user) throws FtpException {
        throw new UnsupportedOperationException("Saving of FTP Users is not supported.");
    }

    public boolean doesExist(String username) throws FtpException {
        return (TEST_USERNAME.equals(username) || anonUser.getName().equals(username)) ? true : false;
    }

    public User authenticate(Authentication authentication) throws AuthenticationFailedException {
        if(UsernamePasswordAuthentication.class.isAssignableFrom(authentication.getClass())) {
            UsernamePasswordAuthentication upAuth = (UsernamePasswordAuthentication) authentication;

            if(TEST_USERNAME.equals(upAuth.getUsername()) && TEST_PASSWORD.equals(upAuth.getPassword())) {
                return testUser;
            }

            if(anonUser.getName().equals(upAuth.getUsername())) {
                return anonUser;
            }
        } else if(AnonymousAuthentication.class.isAssignableFrom(authentication.getClass())) {
            return anonUser;
        }

        return null;
    }
}

Update 15/Nov/2016: Note on xml-configuration.

The xml-element inbound-channel-adapter is directly linked to the FtpInboundFileSynchronizer via org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser via FtpNamespaceHandler via spring-integration-ftp-4.3.5.RELEASE.jar!/META-INF/spring.handlers. Following the xml-custom reference guide, specifying a custom FtpNamespaceHandler in a local META-INF/spring.handlers file should allow you to use the FtpUpdatingFileSynchronizer instead of the FtpInboundFileSynchronizer. It did not work for me with unit-tests though and a proper solution would probably involve creating extra/modified xsd-files so that the regular inbound-channel-adapter is using the regular FtpInboundFileSynchronizer and a special inbound-updating-channel-adapter is using the FtpUpdatingFileSynchronizer. Doing this properly is a bit out of scope for this answer. A quick hack can get you started though. You can overwrite the default FtpNamespaceHandler by creating package org.springframework.integration.ftp.config and class FtpNamespaceHandler in your local project. Contents shown below:

package org.springframework.integration.ftp.config;

public class FtpNamespaceHandler extends org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler {

    @Override
    public void init() {
        System.out.println("Initializing FTP updating file synchronizer.");
        // one updated line below, rest copied from original FtpNamespaceHandler
        registerBeanDefinitionParser("inbound-channel-adapter", new MyFtpInboundChannelAdapterParser());
        registerBeanDefinitionParser("inbound-streaming-channel-adapter",
                new FtpStreamingInboundChannelAdapterParser());
        registerBeanDefinitionParser("outbound-channel-adapter", new FtpOutboundChannelAdapterParser());
        registerBeanDefinitionParser("outbound-gateway", new FtpOutboundGatewayParser());
    }

}

package org.springframework.integration.ftp.config;

import org.springframework.integration.file.remote.synchronizer.InboundFileSynchronizer;
import org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser;

public class MyFtpInboundChannelAdapterParser extends FtpInboundChannelAdapterParser {

    @Override
    protected Class<? extends InboundFileSynchronizer> getInboundFileSynchronizerClass() {
        System.out.println("Returning updating file synchronizer.");
        return FtpUpdatingFileSynchronizer.class;
    }

}

Also add preserve-timestamp="true" to the xml-file to prevent the new IllegalArgumentException: for updating timestamps must be preserved.

Question:

I'm using

  • Sprint Integration(File, SFTP, etc) 4.3.6
  • Spring Boot 1.4.3
  • Spring Integration Java DSL 1.1.4

and I'm trying to set up an SFTP outbound adapter that would allow me move a file to a directory on the remote system and also remove or rename the file in my local system.

So, for instance I would like to place a file, a.txt, in a local directory and have it SFTP'ed to a remote server in the directory inbound. Once the transfer is complete I would like to have the local copy of a.txt deleted, or renamed.

I was toying with a couple of ways to this. So here is my common SessionFactory for the test.

protected SessionFactory<ChannelSftp.LsEntry> buildSftpSessionFactory() {
    DefaultSftpSessionFactory sessionFactory = new DefaultSftpSessionFactory();
    sessionFactory.setHost("localhost");
    sessionFactory.setUser("user");
    sessionFactory.setAllowUnknownKeys(true);
    sessionFactory.setPassword("pass");
    CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
    return cachingSessionFactory;
}

This is a transformer that I have to add some of the headers to the message

@Override
public Message<File> transform(Message<File> source) {
    System.out.println("here is the thing : "+source);
    File file = (File)source.getPayload();
    Message<File> transformedMessage = MessageBuilder.withPayload(file)
            .copyHeaders(source.getHeaders())
            .setHeaderIfAbsent(FileHeaders.ORIGINAL_FILE, file)
            .setHeaderIfAbsent(FileHeaders.FILENAME, file.getName())
            .build();
    return transformedMessage;
}

I then have an integration flow that uses a poller to watch a local directory and invoke this :

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/")
            )
            .get();
}

This works fine, but leaves the local file. Any ideas on how to have this remove local file after the upload completes? Should I be looking at the SftpOutboundGateway instead?

Thanks in advance!

Artem's answer worked perfectly! Here is a quick example that deletes the local file after it is pushed.

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/"), c -> c.advice(expressionAdvice(c))
            )
            .get();
}

@Bean
public Advice expressionAdvice(GenericEndpointSpec<FileTransferringMessageHandler<ChannelSftp.LsEntry>> c) {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnSuccessExpression("payload.delete()");
    advice.setOnFailureExpression("payload + ' failed to upload'");
    advice.setTrapException(true);
    return advice;
}

Answer:

For this purpose you can use several approaches.

All of them are based on the fact that you do something else with the original request message for the Sftp.outboundAdapter().

  1. .publishSubscribeChannel() allows you to send the same message to several subscribers, when the second will receive it only the first one finishes its work. By default, if you don't specify Executor

  2. routeToRecipients() allows you make the same result but via different component.

  3. ExpressionEvaluatingRequestHandlerAdvice - you add this one to the .advice() of the Sftp.outboundAdapter() endpoint definition - the second .handle() argument and perform file.delete() via onSuccessExpression:

    .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
    

Question:

I would like to use the SFTP Outbound Gateway to get a file via SFTP but I only find examples using XML config. How can this be done using Java config?

Update (Thanks to Artem Bilan help)

MyConfiguration class:

@Configuration
public class MyConfiguration {

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
        sftpSessionFactory.setHost("myhost");
        sftpSessionFactory.setPort(22);
        sftpSessionFactory.setUser("uname");
        sftpSessionFactory.setPassword("pass");
        sftpSessionFactory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(sftpSessionFactory);
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        SftpOutboundGateway sftpOutboundGateway = new  SftpOutboundGateway(sftpSessionFactory(), "get", "#getPayload() == '/home/samadmin/test.endf'");
        sftpOutboundGateway.setLocalDirectory(new File("C:/test/gateway/"));
        return sftpOutboundGateway;
    }

}

My application class:

@SpringBootApplication
@EnableIntegration
public class TestIntegrationApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestIntegrationApplication.class, args);
    }
}

Configuration succeeds now but no SFTP occurs. Need to figure out how to request SFTP.


Answer:

Quoting Reference Manual:

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
    return new SftpOutboundGateway(ftpSessionFactory(), "ls");
}

Also pay attention to the Java DSL sample in the next section there.

EDIT

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
    SftpOutboundGateway sftpOutboundGateway = new  SftpOutboundGateway(sftpSessionFactory(), "get", "payload");
    sftpOutboundGateway.setLocalDirectory(new File("C:/test/gateway/"));
    return sftpOutboundGateway;
}

In case of GET SFTP command the expression ctor arg maybe like above - just a reference to the Message.getPayload() for all incoming messages.

In this case you should send to the sftpChannel a Message like:

new GenericMessage<>("/home/samadmin/test.endf");

So, that /home/samadmin/test.endf is a payload of that Message. When it arrives to the SftpOutboundGateway, that expression is evaluated against that message and getPayload() is called by SpEL. So, the GET command will be preformed with the desired path to the remote file.

An other message may have fully different path to some other file.

Question:

how to create an file in remote directory from byte[], as there is send() methods available in PollableChannel. from below code able to send file to remote, but it's creating an file in local machine. how to avoid creating file in local machine?

PollableChannel remoteFileChannel = context.getBean("outputChannel", PollableChannel.class); 

Message<byte[]> sendFile = MessageBuilder.withPayload("hi how are you".getBytes()).build();

remoteFileChannel.send(sendFile);

spring sftp configuration:

<bean id="sftpSessionFactory"
        class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory"
        p:host="${sftp.host}"
        p:port="${sftp.port}"
        p:user="${sftp.username}"
        p:password="${sftp.password}"
        p:allowUnknownKeys="${sftp.allowUnknownKeys}" />

<int:channel id="outputChannel">
        <int:queue />
</int:channel>

<int-sftp:outbound-channel-adapter id="outboundAdapter"
        session-factory="sftpSessionFactory"
        channel="outputChannel"
        charset="UTF-8"
        remote-directory="${sftp.remotedir}"
        remote-filename-generator="randomFileNameGenerator" /> 

how to create an file with random name and write bytes into it?

I have tried with custom file name generator class:

@Component("randomFileNameGenerator")
public class RandomFileNameGenerator implements FileNameGenerator {

    @Override
    public String generateFileName(Message<?> msg) {
        long number = (long) Math.floor(Math.random() * 90000000L) + 1000000L;
        String fileName = String.format("%d.txt", number);
        return fileName;
    }

}

where the filename was not setting, creating file with name like "adas-asdfsadf-545sadf.msg". can any one point me where am doing wrong


Answer:

First of all you don't show your Spring Integration configuration.

From other hand it isn't clear why you say " it's creating an file", if it looks like code is yours. So, that is you who create a file.

There are out-of-the-box components like <int-sftp:outbound-channel-adapter> and <int-sftp:outbound-gateway>, which definitely can address your goal.

Both of them are based on the RemoteFileTemplate.send() operation which handles byte[] payload very well.

See more info in the Reference Manual and Samples.

UPDATE

how to create an file with random name and write bytes into it?

Seems for me we have just fixed with you the byte[] question.

Re. "random name". Looks like you go right way: the remote-filename-generator="fileNameGenerator" is exactly for that task. See FileNameGenerator strategy and its implementations in the Framework. You can utilize your randomize function in your custom implementation and refer it from that channel-adapter definition.

Question:

I have a SFTP directory and reading files and sending the files for further processing to a ServiceActivator.At any point I need to process them parallely using the handler.

Here is my SPring Integration java DSL flow.

IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
                        .temporaryFileSuffix("COPY")
                        .localDirectory(directory)
                        .deleteRemoteFiles(false)
                        .preserveTimestamp(true)
                        .remoteDirectory("remoteDir"))
                        .patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
                        .handle("mybean", "myMethod")
                        .handle(Files.outboundAdapter(new File("success")))         
                        .deleteSourceFiles(true)
                        .autoCreateDirectory(true))
                        .get();

Update:Here is my ThreadPoolExecutor:

@Bean(name = "executor")
public Executor getExecutor()
{
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setQueueCapacity(20);          
    executor.initialize();
    return executor;
}

Answer:

The Sftp.inboundAdapter() (SftpInboundFileSynchronizingMessageSource) returns remote files one by one anyway. First of all it synchronizes them to the local directory and only after that poll them for message processing as File payload.

To process them in parallel that would be just enough to add a taskExecutor to your e.poller() definition and all those maxMessagesPerPoll(5) will be distributed to different threads.