1
votes

I am trying to run the Dynamic FTP example in Spring Integration. But changed the mode to SFTP. While running the test it is showing the expression evaluation is failed due to below message

org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 30 in XML document from class path resource [META-INF/spring/integration/dynamic-ftp-outbound-adapter-context.xml] is invalid; nested exception is org.xml.sax.SAXParseException; lineNumber: 30; columnNumber: 31; cvc-complex-type.2.4.c: The matching wildcard is strict, but no declaration can be found for element 'int-sftp:outbound-channel-adapter'.

My files are as follow

**pom.xml**

    <?xml version="1.0" encoding="UTF-8"?>
    <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
      <modelVersion>4.0.0</modelVersion>
      <groupId>org.springframework.integration.samples</groupId>
      <artifactId>DynamicFtp</artifactId>
      <version>0.0.1</version>
      <name>Dynamic FTP </name>
      <description>Dynamic FTP</description>


      <!-- <repositories>
        <repository>
          <id>repo.spring.io.milestone</id>
          <name>Spring Framework Maven Milestone Repository</name>
          <url>https://repo.spring.io/libs-milestone</url>
        </repository>
      </repositories> -->
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.hamcrest</groupId>
          <artifactId>hamcrest-all</artifactId>
          <version>1.3</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.springframework.integration</groupId>
          <artifactId>spring-integration-sftp</artifactId>
          <version>4.1.2.RELEASE</version>
          <scope>compile</scope>
        </dependency>


        <dependency>
          <groupId>org.springframework.integration</groupId>
          <artifactId>spring-integration-ftp</artifactId>
          <version>4.1.0.RELEASE</version>
          <scope>compile</scope>
        </dependency>


        <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
        <version>4.1.0.RELEASE</version>
        </dependency>






        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-test</artifactId>
          <version>4.1.1.RELEASE</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
          <version>1.2.17</version>
          <scope>compile</scope>
        </dependency>
        <dependency>
          <groupId>org.mockito</groupId>
          <artifactId>mockito-core</artifactId>
          <version>1.9.5</version>
          <scope>test</scope>
        </dependency>
      </dependencies>
    </project>

 **DynamicFtpChannelResolver**

public class DynamicFtpChannelResolver {

    //In production environment this value will be significantly higher
    //This is just to demonstrate the concept of limiting the max number of
    //Dynamically created application contexts we'll hold in memory when we execute
    //the code from a junit
    public static final int MAX_CACHE_SIZE = 2;

    private final LinkedHashMap<String, MessageChannel> channels =
                new LinkedHashMap<String, MessageChannel>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    protected boolean removeEldestEntry(
                            Entry<String, MessageChannel> eldest) {
                        //This returning true means the least recently used
                        //channel and its application context will be closed and removed
                        boolean remove = size() > MAX_CACHE_SIZE;
                        if(remove) {
                            MessageChannel channel = eldest.getValue();
                            ConfigurableApplicationContext ctx = contexts.get(channel);
                            if(ctx != null) { //shouldn't be null ideally
                                ctx.close();
                                contexts.remove(channel);
                            }
                        }
                        return remove;
                    }

                };

    private final Map<MessageChannel, ConfigurableApplicationContext> contexts =
                new HashMap<MessageChannel, ConfigurableApplicationContext>();



    /**
     * Resolve a customer to a channel, where each customer gets a private
     * application context and the channel is the inbound channel to that
     * application context.
     *
     * @param customer
     * @return a channel
     */
    public MessageChannel resolve(String customer) {
        MessageChannel channel = this.channels.get(customer);
        if (channel == null) {
            channel = createNewCustomerChannel(customer);
        }
        return channel;
    }

    private synchronized MessageChannel createNewCustomerChannel(String customer) {
        MessageChannel channel = this.channels.get(customer);
        if (channel == null) {
            ConfigurableApplicationContext ctx = new ClassPathXmlApplicationContext(
                    new String[] { "/META-INF/spring/integration/dynamic-ftp-outbound-adapter-context.xml" },
                    false);
            this.setEnvironmentForCustomer(ctx, customer);
            ctx.refresh();
            channel = ctx.getBean("toFtpChannel", MessageChannel.class);
            this.channels.put(customer, channel);
            //Will works as the same reference is presented always
            this.contexts.put(channel, ctx);
        }
        return channel;
    }

    /**
     * Use Spring 3.1. environment support to set properties for the
     * customer-specific application context.
     *
     * @param ctx
     * @param customer
     */
    private void setEnvironmentForCustomer(ConfigurableApplicationContext ctx,
            String customer) {
        StandardEnvironment env = new StandardEnvironment();
        Properties props = new Properties();
        // populate properties for customer
//      props.setProperty("host", "host.for." + customer);
//      props.setProperty("user", "user");
//      props.setProperty("password", "password");
        props.setProperty("host", "172.18.554.23");
        props.setProperty("user", "iuser");
        props.setProperty("port", "22");
        props.setProperty("password", "ipwddd");

        props.setProperty("remote.directory", "/u01/dpp/dppuser/ftp/");
        PropertiesPropertySource pps = new PropertiesPropertySource("ftpprops", props);
        env.getPropertySources().addLast(pps);
        ctx.setEnvironment(env);
        System.out.println("the properties are set" +pps);
    }
}


**dynamic-ftp-outbound-context.xml**

        <?xml version="1.0" encoding="UTF-8"?>
        <beans xmlns="http://www.springframework.org/schema/beans"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xmlns:int="http://www.springframework.org/schema/integration"
            xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
            xmlns:int-sftp="http://www.springframework.org/schema/integration   /sftp"
            xmlns:context="http://www.springframework.org/schema/context"
            xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
                http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/ftp/spring-integration-sftp.xsd">

            <context:property-placeholder />

            <bean id="ftpClientFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
                <property name="host" value="${host}"/>
                <property name="port" value="${port}"/>
                <property name="username" value="${user}"/>
                <property name="password" value="${password}"/>
            </bean>

            <int:channel id="toFtpChannel"/>

            <int-sftp:outbound-channel-adapter id="ftpOutbound"
                        channel="toFtpChannel"
                        remote-directory="${remote.directory}"
                        session-factory="sftpSessionFactory"
                        mode="REPLACE"
                        remote-file-separator="/">
            <int:poller fixed-rate="100" time-unit="MILLISECONDS" max-messages-per-poll="100"/>         
        </int-sftp:outbound-channel-adapter>            


                <bean id="sftpSessionFactory"
                class="org.springframework.integration.file.remote.session.CachingSessionFactory">
                <constructor-arg ref="ftpClientFactory" />
            </bean>

        </beans>
    **dyamic-ftp-ountbound-sample-context.xml**

        <?xml version="1.0" encoding="UTF-8"?>
        <beans xmlns="http://www.springframework.org/schema/beans"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xmlns:int="http://www.springframework.org/schema/integration"
            xmlns:context="http://www.springframework.org/schema/context"
            xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

            <bean id="channelResolver" class="com.tcs.DynamicFtp.DynamicFtpChannelResolver" />

            <int:channel id="toDynRouter" />

            <int:router input-channel="toDynRouter"
                expression="@channelResolver.resolve(headers['customer'])"/>

        </beans>

    **The test file**

    import static org.hamcrest.Matchers.instanceOf;
    import static org.junit.Assert.assertEquals;
    import static org.junit.Assert.assertThat;
    import static org.junit.Assert.assertTrue;

    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.FileWriter;
    import java.net.UnknownHostException;

    import org.junit.Test;
    import org.springframework.context.ConfigurableApplicationContext;



 import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessagingException;


    public class FtpOutboundChannelAdapterSample {

        @Test
        public void runDemo() throws Exception{
            ConfigurableApplicationContext ctx =
                new ClassPathXmlApplicationContext("META-INF/spring/integration  /DynamicFtpOutboundChannelAdapterSample-context.xml");
            MessageChannel channel = ctx.getBean("toDynRouter", MessageChannel.class);
            File file = File.createTempFile("temp", "txt");

            BufferedWriter bw = new BufferedWriter(new FileWriter(file));
            bw.write("This is the temporary file content");
            bw.close();


            Message<File> message = MessageBuilder.withPayload(file)
                            .setHeader("customer", "cust1")
                            .build();
            try {
                channel.send(message);
            }
            catch (MessagingException e) {
                assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
                assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust1"));
            }
            // send another so we can see in the log we don't create the ac again.
            try {
                channel.send(message);
            }
            catch (MessagingException e) {
                assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
                assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust1"));
            }
            // send to a different customer; again, check the log to see a new ac is built
            message = MessageBuilder.withPayload(file)
                    .setHeader("customer", "cust2").build();
            try {
                channel.send(message);
            }
            catch (MessagingException e) {
                assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
                assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust2"));
            }

            // send to a different customer; again, check the log to see a new ac is built
            //and the first one created (cust1) should be closed and removed as per the max cache size restriction
            message = MessageBuilder.withPayload(file)
                    .setHeader("customer", "cust3").build();
            try {
                channel.send(message);
            }
            catch (MessagingException e) {
                assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
                assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust3"));
            }

            //send to cust1 again, since this one has been invalidated before, we should
            //see a new ac created (with ac of cust2 destroyed and removed)
            message = MessageBuilder.withPayload(file)
                    .setHeader("customer", "cust1").build();
            try {
                channel.send(message);
            }
            catch (MessagingException e) {
                assertThat(e.getCause().getCause().getCause(), 

    instanceOf(UnknownHostException.class));
                    assertEquals("host.for.cust1", 

e.getCause().getCause().getCause().getMessage());
                }

                ctx.close();
            }
        }
1

1 Answers

0
votes

You have an error in your xmlns configuration...

        xmlns:int-sftp="http://www.springframework.org/schema/integration   /sftp"

there should be no spaces in it.

EDIT:

You have another error:

http://www.springframework.org/schema/integration/ftp/spring-integration-sftp.xsd">

should be

http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd">

I generally use an IDE to manage the namespace stuff; doing it manually is error-prone, as you have found.

I just tested it locally with no problems. However, this

<property name="username" value="${user}"/>

should have failed because the property is user on the sftp factory; also why did you add a <poller/>?

The channel is not pollable.