I am trying to implement the Dynamic Sftp Inboud Adapter. Following are the source files and configuration.
The problem is, it works when the thread hangs for some time, but if I run Junit even if the Junit test is successful the file is not copied to local directory. But if the same junit test is run in debug mode, ie the thread is given some time, then the files are copied.
Is there any other way to start the adapter effectively or write this logic more effectively?
The resolver file
package org.springframework.integration.samples.ftp;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
public class DynamicInboundSftpChannelResolver {
public static final int MAX_CACHE_SIZE = 2;
private final LinkedHashMap<String, MessageChannel > channels =
new LinkedHashMap<String, MessageChannel >() {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(
Entry<String, MessageChannel > eldest) {
//This returning true means the least recently used
//channel and its application context will be closed and removed
boolean remove = size() > MAX_CACHE_SIZE;
if(remove) {
MessageChannel channel = eldest.getValue();
ConfigurableApplicationContext ctx = contexts.get(channel);
if(ctx != null) { //shouldn't be null ideally
ctx.close();
contexts.remove(channel);
}
}
return remove;
}
};
private final Map<MessageChannel , ConfigurableApplicationContext> contexts =
new HashMap<MessageChannel , ConfigurableApplicationContext>();
/**
* Resolve a customer to a channel, where each customer gets a private
* application context and the channel is the inbound channel to that
* application context.
*
* @param customer
* @return a channel
*/
public MessageChannel resolve(String customer) {
MessageChannel channel = this.channels.get(customer);
if (channel == null) {
try {
channel = createNewCustomerChannel(customer, "172.37.447.34", 22, "upuser", "upuser@123", "/tmp"+"/"+customer, "D://test");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return channel;
}
private synchronized MessageChannel createNewCustomerChannel(String customer,
String host, int port, String user, String password, String remoteDir, String localDir) throws InterruptedException {
MessageChannel channel = this.channels.get(customer);
if (channel == null) {
ConfigurableApplicationContext ctx = new ClassPathXmlApplicationContext(
new String[] { "/META-INF/spring/integration/dynamic-sftp-inbound-adapter-parent.xml" },
false);
this.setEnvironmentForCustomer(ctx, customer,host, port, user, password, remoteDir, localDir);
ctx.refresh();
channel = ctx.getBean("sftpInboundChannel", MessageChannel.class);
this.channels.put(customer, channel);
//Will works as the same reference is presented always
this.contexts.put(channel, ctx);
SourcePollingChannelAdapter sftpInboundChannelAdapter = ctx.getBean("dynInboundAdapter", SourcePollingChannelAdapter.class);
sftpInboundChannelAdapter.start();
}
return channel;
}
/**
* Use Spring 3.1. environment support to set properties for the
* customer-specific application context.
*
* @param ctx
* @param customer
*/
private void setEnvironmentForCustomer(ConfigurableApplicationContext ctx, String customer,
String host, int port, String user, String password, String remoteDir,String localDir) {
StandardEnvironment env = new StandardEnvironment();
Properties props = new Properties();
// populate properties for customer
props.setProperty("host", host);
props.setProperty("port", port+"");
props.setProperty("user", user);
props.setProperty("password", password);
props.setProperty("remoteDirectory", remoteDir);
props.setProperty("localDirectory",localDir);
PropertiesPropertySource pps = new PropertiesPropertySource("ftpprops", props);
env.getPropertySources().addLast(pps);
ctx.setEnvironment(env);
}
}
The parent xml file
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<context:property-placeholder />
<bean id="sftpSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="defaultSftpSessionFactory" />
</bean>
<bean id="defaultSftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="${host}" />
<property name="port" value="${port}"/>
<property name="user" value="${user}" />
<property name="password" value="${password}" />
</bean>
<int-sftp:inbound-channel-adapter id="dynInboundAdapter"
channel="sftpInboundChannel"
session-factory="sftpSessionFactory"
remote-directory="${remoteDirectory}" charset="UTF-8"
local-directory="${localDirectory}"
auto-create-local-directory="true"
delete-remote-files="false" filename-regex=".*\.txt$|.*\.csv$|.*\.xml$"
local-filter="acceptAllFileListFilter" >
<int:poller fixed-rate="1" time-unit="MILLISECONDS" max-messages-per-poll="100"/>
</int-sftp:inbound-channel-adapter>
<bean id="acceptAllFileListFilter"
class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
<int:channel id="sftpInboundChannel">
<int:queue />
</int:channel>
</beans>
The child xml file
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<bean id="inboundChannelResolver" class="org.springframework.integration.samples.ftp.DynamicInboundSftpChannelResolver" />
<int:channel id="toDynRouterInbound" />
<int:router input-channel="toDynRouterInbound"
expression="@inboundChannelResolver.resolve(headers['customer'])"/>
</beans>
The test file
package org.springframework.integration.samples.ftp;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.net.UnknownHostException;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
public class SftpInboundAdapter {
private ApplicationContext appCtx;
@Test
public void runDemo() throws Exception{
ConfigurableApplicationContext ctx =
new ClassPathXmlApplicationContext("META-INF/spring/integration/dynamic-sftp-inbound-adapter.xml");
MessageChannel channel = ctx.getBean("toDynRouterInbound", MessageChannel.class);
Message<String> message = MessageBuilder.withPayload("inboundMessage")
.setHeader("customer", "cust1")
.setHeader("host", "172.37.447.34")
.setHeader("port", "22")
.setHeader("user", "upuser")
.setHeader("password", "upuser@123")
.setHeader("localDirectory", "D://test")
.setHeader("remoteDirectory", "/tmp/cust1/")
.build();
try {
channel.send(message);
}
catch (MessagingException e) {
assertThat(e.getCause().getCause().getCause(), instanceOf(UnknownHostException.class));
assertTrue(e.getCause().getCause().getCause().getMessage().startsWith("host.for.cust1"));
}
// send another so we can see in the log we don't create the ac again.
ctx.close();
}
}