I am trying to run the Dynamic FTP example in Spring Integration. But changed the mode to SFTP. While running the test it is showing the expression evaluation is failed due to below message
org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 30 in XML document from class path resource [META-INF/spring/integration/dynamic-ftp-outbound-adapter-context.xml] is invalid; nested exception is org.xml.sax.SAXParseException; lineNumber: 30; columnNumber: 31; cvc-complex-type.2.4.c: The matching wildcard is strict, but no declaration can be found for element 'int-sftp:outbound-channel-adapter'.
My files are as follow
**pom.xml**
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.integration.samples</groupId>
<artifactId>DynamicFtp</artifactId>
<version>0.0.1</version>
<name>Dynamic FTP </name>
<description>Dynamic FTP</description>
<!-- <repositories>
<repository>
<id>repo.spring.io.milestone</id>
<name>Spring Framework Maven Milestone Repository</name>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
</repositories> -->
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
<version>4.1.2.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>4.1.0.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.1.1.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
**DynamicFtpChannelResolver**
public class DynamicFtpChannelResolver {
//In production environment this value will be significantly higher
//This is just to demonstrate the concept of limiting the max number of
//Dynamically created application contexts we'll hold in memory when we execute
//the code from a junit
public static final int MAX_CACHE_SIZE = 2;
private final LinkedHashMap<String, MessageChannel> channels =
new LinkedHashMap<String, MessageChannel>() {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(
Entry<String, MessageChannel> eldest) {
//This returning true means the least recently used
//channel and its application context will be closed and removed
boolean remove = size() > MAX_CACHE_SIZE;
if(remove) {
MessageChannel channel = eldest.getValue();
ConfigurableApplicationContext ctx = contexts.get(channel);
if(ctx != null) { //shouldn't be null ideally
ctx.close();
contexts.remove(channel);
}
}
return remove;
}
};
private final Map<MessageChannel, ConfigurableApplicationContext> contexts =
new HashMap<MessageChannel, ConfigurableApplicationContext>();
/**
* Resolve a customer to a channel, where each customer gets a private
* application context and the channel is the inbound channel to that
* application context.
*
* @param customer
* @return a channel
*/
public MessageChannel resolve(String customer) {
MessageChannel channel = this.channels.get(customer);
if (channel == null) {
channel = createNewCustomerChannel(customer);
}
return channel;
}
private synchronized MessageChannel createNewCustomerChannel(String customer) {
MessageChannel channel = this.channels.get(customer);
if (channel == null) {
ConfigurableApplicationContext ctx = new ClassPathXmlApplicationContext(
new String[] { "/META-INF/spring/integration/dynamic-ftp-outbound-adapter-context.xml" },
false);
this.setEnvironmentForCustomer(ctx, customer);
ctx.refresh();
channel = ctx.getBean("toFtpChannel", MessageChannel.class);
this.channels.put(customer, channel);
//Will works as the same reference is presented always
this.contexts.put(channel, ctx);
}
return channel;
}
/**
* Use Spring 3.1. environment support to set properties for the
* customer-specific application context.
*
* @param ctx
* @param customer
*/
private void setEnvironmentForCustomer(ConfigurableApplicationContext ctx,
String customer) {
StandardEnvironment env = new StandardEnvironment();
Properties props = new Properties();
// populate properties for customer
// props.setProperty("host", "host.for." + customer);
// props.setProperty("user", "user");
// props.setProperty("password", "password");
props.setProperty("host", "172.18.554.23");
props.setProperty("user", "iuser");
props.setProperty("port", "22");
props.setProperty("password", "ipwddd");
props.setProperty("remote.directory", "/u01/dpp/dppuser/ftp/");
PropertiesPropertySource pps = new PropertiesPropertySource("ftpprops", props);
env.getPropertySources().addLast(pps);
ctx.setEnvironment(env);
System.out.println("the properties are set" +pps);
}
}
**dynamic-ftp-outbound-context.xml**
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xmlns:int-sftp="http://www.springframework.org/schema/integration /sftp"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/ftp/spring-integration-sftp.xsd">
<context:property-placeholder />
<bean id="ftpClientFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="${host}"/>
<property name="port" value="${port}"/>
<property name="username" value="${user}"/>
<property name="password" value="${password}"/>
</bean>
<int:channel id="toFtpChannel"/>
<int-sftp:outbound-channel-adapter id="ftpOutbound"
channel="toFtpChannel"
remote-directory="${remote.directory}"
session-factory="sftpSessionFactory"
mode="REPLACE"
remote-file-separator="/">
<int:poller fixed-rate="100" time-unit="MILLISECONDS" max-messages-per-poll="100"/>
</int-sftp:outbound-channel-adapter>
<bean id="sftpSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="ftpClientFactory" />
</bean>
</beans>
**dyamic-ftp-ountbound-sample-context.xml**
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<bean id="channelResolver" class="com.tcs.DynamicFtp.DynamicFtpChannelResolver" />
<int:channel id="toDynRouter" />
<int:router input-channel="toDynRouter"
expression="@channelResolver.resolve(headers['customer'])"/>
</beans>
**The test file**
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.net.UnknownHostException;
import org.junit.Test;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
public class FtpOutboundChannelAdapterSample {
@Test
public void runDemo() throws Exception{
ConfigurableApplicationContext ctx =
new ClassPathXmlApplicationContext("META-INF/spring/integration /DynamicFtpOutboundChannelAdapterSample-context.xml");
MessageChannel channel = ctx.getBean("toDynRouter", MessageChannel.class);
File file = File.createTempFile("temp", "txt");
BufferedWriter bw = new BufferedWriter(new FileWriter(file));
bw.write("This is the temporary file content");
bw.close();
Message<File> message = MessageBuilder.withPayload(file)
.setHeader("customer", "cust1")
.build();
try {
channel.send(message);
}
catch (MessagingException e) {
assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust1"));
}
// send another so we can see in the log we don't create the ac again.
try {
channel.send(message);
}
catch (MessagingException e) {
assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust1"));
}
// send to a different customer; again, check the log to see a new ac is built
message = MessageBuilder.withPayload(file)
.setHeader("customer", "cust2").build();
try {
channel.send(message);
}
catch (MessagingException e) {
assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust2"));
}
// send to a different customer; again, check the log to see a new ac is built
//and the first one created (cust1) should be closed and removed as per the max cache size restriction
message = MessageBuilder.withPayload(file)
.setHeader("customer", "cust3").build();
try {
channel.send(message);
}
catch (MessagingException e) {
assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust3"));
}
//send to cust1 again, since this one has been invalidated before, we should
//see a new ac created (with ac of cust2 destroyed and removed)
message = MessageBuilder.withPayload(file)
.setHeader("customer", "cust1").build();
try {
channel.send(message);
}
catch (MessagingException e) {
assertThat(e.getCause().getCause().getCause(),
instanceOf(UnknownHostException.class));
assertEquals("host.for.cust1",
e.getCause().getCause().getCause().getMessage());
}
ctx.close();
}
}