I have an FTP Inbound Adapter which will be deployed on multiple instances of tomcat. The issue is that all instances will pull files from the remote directory but I need a single file to only be pulled once not by all instances.
Is there a way achieve this?
This is the adapter definition:
<int-sftp:inbound-channel-adapter id="inboundMeasuremntFtpReceiveAdapter" session-factory="inboundMeasurementSftpSession"
auto-startup="${ftp_measurement_autostart}"
local-directory="#{'${dir_interface_home}' + '${dir_interface}' + '${dir_inbound_measurement}' + '${dir_data}'}"
channel="inboundMeasuremntSftpReceiveChannel"
remote-directory="${ftp_measurement_remote_dir}" filename-pattern="*.txt" >
<int:poller id="inboundMeasurementSftpPoller" trigger="inboundMeasurementFtpTrigger" max-messages-per-poll="-1"
error-channel="inboundMeasurementSftpErrorEnrichChannel">
</int:poller>
</int-sftp:inbound-channel-adapter>
Metadata store class
package com.deere.componentdatafiles.nondeerefile;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.MemcachedClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
public class MemcacheMetadatastore implements ConcurrentMetadataStore,InitializingBean, DisposableBean, Closeable, Flushable {
MemcachedClient cache = null;
public MemcacheMetadatastore() {
super();try {
cache = new MemcachedClient(AddrUtil.getAddresses("URL"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();} }
@Override
public String get(String key) {
// TODO Auto-generated method stub
return (String) cache.get(key);
}
@Override
public void put(String key, String value) {
putIfAbsent(key, value);
}
@Override
public String remove(String key) {
// TODO Auto-generated method stub
return cache.delete(key).toString();
}
@Override
public void flush() throws IOException {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
flush();
}
@Override
public void destroy() throws Exception {
flush();
}
@Override
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub
}
@Override
public String putIfAbsent(String key, String value) {
String fileValue = get(key);
if (fileValue == null && "".equals(fileValue)) {
cache.set(key, 0, value);
return null;
}
return fileValue;
}
@Override
public boolean replace(String arg0, String arg1, String arg2) {
// TODO Auto-generated method stub
return false;
}
}
Configuration
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-4.0.xsd
http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd">
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="****"/>
<property name="username" value="****"/>
<property name="password" value="****"/>
<property name="clientMode" value="0"/>
<property name="fileType" value="2"/>
<property name="bufferSize" value="100000"/>
</bean>
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="receiveChannel" session-factory="ftpClientFactory"
auto-create-local-directory="true" delete-remote-files="false"
remote-directory="/OUT/SDI402_CARATT_JD" remote-file-separator="/"
filter="compositeFilter"
local-directory="."
local-filter="acceptAll">
<int:poller fixed-rate="5000" max-messages-per-poll="1">
<!-- <int:transactional synchronization-factory="syncFactory"/> -->
</int:poller>
</int-ftp:inbound-channel-adapter>
<int:channel id="receiveChannel" />
<int:channel id="afterSuccessDeleteChannel" />
<int:service-activator
id="nonDeereXmlServiceActivator" input-channel="receiveChannel"
ref="controllerListener" method="listen1" />
<int:service-activator input-channel="receiveChannel" expression="T(java.lang.System).out.println(payload.toString())">
<int:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()" />
<property name="successChannel" ref="afterSuccessDeleteChannel" />
<property name="onFailureExpression" value="payload.rename('/tmp/bad/' + payload.name)" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
<!-- <int:transformer input-channel="afterSuccessDeleteChannel" output-channel="stdout"
expression="'Removal of ' + inputMessage.payload.absolutePath + ' after transfer ' + (payload ? 'succeeded' : 'failed')" />
<int-stream:stdout-channel-adapter id="stdout" append-newline="true"/> -->
<bean id="acceptAll" class="org.springframework.integration.file.filters.AcceptAllFileListFilter" />
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter">
<!-- <constructor-arg value="*.xml" /> -->
</bean>
<bean class="org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter">
<constructor-arg name="store" ref="metadataStore"/>
<constructor-arg value="foo/bar/"/>
</bean>
</list>
</constructor-arg>
</bean>
<bean name="metadataStore" class="com.deere.componentdatafiles.nondeerefile.MemcacheMetadatastore">
<!-- <property name="baseDirectory" value="/tmp/"/> -->
</bean>
</beans>