1
votes

I am trying to implement the Dynamic Sftp Inboud Adapter. Following are the source files and configuration.

The problem is, it works when the thread hangs for some time, but if I run Junit even if the Junit test is successful the file is not copied to local directory. But if the same junit test is run in debug mode, ie the thread is given some time, then the files are copied.

Is there any other way to start the adapter effectively or write this logic more effectively?

The resolver file

package org.springframework.integration.samples.ftp;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;

import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;

public class DynamicInboundSftpChannelResolver {
    public static final int MAX_CACHE_SIZE = 2;

    private final LinkedHashMap<String, MessageChannel > channels =
                new LinkedHashMap<String, MessageChannel >() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    protected boolean removeEldestEntry(
                            Entry<String, MessageChannel > eldest) {
                        //This returning true means the least recently used
                        //channel and its application context will be closed and removed
                        boolean remove = size() > MAX_CACHE_SIZE;
                        if(remove) {
                            MessageChannel channel = eldest.getValue();
                            ConfigurableApplicationContext ctx = contexts.get(channel);
                            if(ctx != null) { //shouldn't be null ideally
                                ctx.close();
                                contexts.remove(channel);
                            }
                        }
                        return remove;
                    }

                };

    private final Map<MessageChannel , ConfigurableApplicationContext> contexts =
                new HashMap<MessageChannel , ConfigurableApplicationContext>();



    /**
     * Resolve a customer to a channel, where each customer gets a private
     * application context and the channel is the inbound channel to that
     * application context.
     *
     * @param customer
     * @return a channel
     */
    public MessageChannel resolve(String customer) {
        MessageChannel  channel = this.channels.get(customer);
        if (channel == null) {
            try {
                channel = createNewCustomerChannel(customer, "172.37.447.34", 22, "upuser", "upuser@123", "/tmp"+"/"+customer, "D://test");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }


        return channel;
    }

    private synchronized MessageChannel  createNewCustomerChannel(String customer, 
            String host, int port, String user, String password, String remoteDir, String localDir) throws InterruptedException {
        MessageChannel channel = this.channels.get(customer);
        if (channel == null) {
            ConfigurableApplicationContext ctx = new ClassPathXmlApplicationContext(
                    new String[] { "/META-INF/spring/integration/dynamic-sftp-inbound-adapter-parent.xml" },
                    false);
            this.setEnvironmentForCustomer(ctx, customer,host, port, user, password, remoteDir, localDir);
            ctx.refresh();
            channel = ctx.getBean("sftpInboundChannel", MessageChannel.class);
            this.channels.put(customer, channel);
            //Will works as the same reference is presented always
            this.contexts.put(channel, ctx);
            SourcePollingChannelAdapter sftpInboundChannelAdapter = ctx.getBean("dynInboundAdapter", SourcePollingChannelAdapter.class);

            sftpInboundChannelAdapter.start();

        }

        return channel;
    }

    /**
     * Use Spring 3.1. environment support to set properties for the
     * customer-specific application context.
     *
     * @param ctx
     * @param customer
     */
    private void setEnvironmentForCustomer(ConfigurableApplicationContext ctx, String customer, 
            String host, int port, String user, String password, String remoteDir,String localDir) {
        StandardEnvironment env = new StandardEnvironment();
        Properties props = new Properties();
        // populate properties for customer
        props.setProperty("host", host);
        props.setProperty("port", port+"");
        props.setProperty("user", user);
        props.setProperty("password", password);
        props.setProperty("remoteDirectory", remoteDir);
        props.setProperty("localDirectory",localDir);
        PropertiesPropertySource pps = new PropertiesPropertySource("ftpprops", props);
        env.getPropertySources().addLast(pps);
        ctx.setEnvironment(env);
    }

}

The parent xml file

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:file="http://www.springframework.org/schema/integration/file"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd 
         http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

    <context:property-placeholder />
    <bean id="sftpSessionFactory"
        class="org.springframework.integration.file.remote.session.CachingSessionFactory">
        <constructor-arg ref="defaultSftpSessionFactory" />
    </bean>
    <bean id="defaultSftpSessionFactory" 
        class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
        <property name="host" value="${host}" />
        <property name="port" value="${port}"/> 
        <property name="user" value="${user}" />
        <property name="password" value="${password}" />
    </bean>




    <int-sftp:inbound-channel-adapter id="dynInboundAdapter"
        channel="sftpInboundChannel"
        session-factory="sftpSessionFactory" 
        remote-directory="${remoteDirectory}" charset="UTF-8"
        local-directory="${localDirectory}"
        auto-create-local-directory="true" 
        delete-remote-files="false" filename-regex=".*\.txt$|.*\.csv$|.*\.xml$" 
        local-filter="acceptAllFileListFilter" >
         <int:poller fixed-rate="1" time-unit="MILLISECONDS" max-messages-per-poll="100"/>  
    </int-sftp:inbound-channel-adapter>


    <bean id="acceptAllFileListFilter"
      class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
    <int:channel id="sftpInboundChannel">
        <int:queue />  
    </int:channel>

</beans>

The child xml file

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:file="http://www.springframework.org/schema/integration/file"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd 
         http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">



    <bean id="inboundChannelResolver" class="org.springframework.integration.samples.ftp.DynamicInboundSftpChannelResolver" />

    <int:channel id="toDynRouterInbound" />

    <int:router input-channel="toDynRouterInbound"
        expression="@inboundChannelResolver.resolve(headers['customer'])"/>         
</beans>

The test file

package org.springframework.integration.samples.ftp;

import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.net.UnknownHostException;

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;

public class SftpInboundAdapter {
    private ApplicationContext appCtx;
    @Test
    public void runDemo() throws Exception{
        ConfigurableApplicationContext ctx =

                new ClassPathXmlApplicationContext("META-INF/spring/integration/dynamic-sftp-inbound-adapter.xml");


        MessageChannel channel = ctx.getBean("toDynRouterInbound", MessageChannel.class);

        Message<String> message = MessageBuilder.withPayload("inboundMessage")
                        .setHeader("customer", "cust1")
                        .setHeader("host", "172.37.447.34")
                        .setHeader("port", "22")
                        .setHeader("user", "upuser")
                        .setHeader("password", "upuser@123")
                        .setHeader("localDirectory", "D://test")
                        .setHeader("remoteDirectory", "/tmp/cust1/")
                        .build();
        try {
            channel.send(message);              
        }
        catch (MessagingException e) {
            assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
            assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust1"));
        }
        // send another so we can see in the log we don't create the ac again.   
        ctx.close();
    }  
}
1

1 Answers

1
votes

You need to stop your test case from ending too quickly.

Add a condition to the test, something like...

int n = 0;
while (n++ < 600 && fileNotYetCopied()) {
    Thread.sleep(100);
}
assertTrue(n < 600); // allow up to 60 seconds