0
votes

I have an FTP Inbound Adapter which will be deployed on multiple instances of tomcat. The issue is that all instances will pull files from the remote directory but I need a single file to only be pulled once not by all instances.

Is there a way achieve this?

This is the adapter definition:

<int-sftp:inbound-channel-adapter id="inboundMeasuremntFtpReceiveAdapter" session-factory="inboundMeasurementSftpSession"
  auto-startup="${ftp_measurement_autostart}"
  local-directory="#{'${dir_interface_home}' + '${dir_interface}' + '${dir_inbound_measurement}' + '${dir_data}'}"
  channel="inboundMeasuremntSftpReceiveChannel"
  remote-directory="${ftp_measurement_remote_dir}" filename-pattern="*.txt" >
     <int:poller id="inboundMeasurementSftpPoller" trigger="inboundMeasurementFtpTrigger" max-messages-per-poll="-1"
                 error-channel="inboundMeasurementSftpErrorEnrichChannel">
     </int:poller>
</int-sftp:inbound-channel-adapter>



 Metadata store class   
      package com.deere.componentdatafiles.nondeerefile;
        import java.io.Closeable;
        import java.io.Flushable;
        import java.io.IOException;
        import net.spy.memcached.AddrUtil;
        import net.spy.memcached.MemcachedClient;
        import org.springframework.beans.factory.DisposableBean;
        import org.springframework.beans.factory.InitializingBean;
        import org.springframework.integration.metadata.ConcurrentMetadataStore;
        public class MemcacheMetadatastore implements ConcurrentMetadataStore,InitializingBean, DisposableBean, Closeable, Flushable {
        MemcachedClient cache = null;
        public MemcacheMetadatastore() {
                super();try {
    cache = new MemcachedClient(AddrUtil.getAddresses("URL"));
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}   }
        @Override
            public String get(String key) {
                // TODO Auto-generated method stub
                return (String) cache.get(key);

            }

            @Override
            public void put(String key, String value) {

                putIfAbsent(key, value);
            }
        @Override
            public String remove(String key) {
                // TODO Auto-generated method stub
                return cache.delete(key).toString();
            }
        @Override
            public void flush() throws IOException {
                // TODO Auto-generated method stub
        }
        @Override
            public void close() throws IOException {
                flush();
            }
        @Override
            public void destroy() throws Exception {
                flush();
            }
        @Override
            public void afterPropertiesSet() throws Exception {
                // TODO Auto-generated method stub
        }
        @Override
            public String putIfAbsent(String key, String value) {

                String fileValue = get(key);
                if (fileValue == null && "".equals(fileValue)) {
                    cache.set(key, 0, value);
                    return null;
                }
                return fileValue;
            }
            @Override
            public boolean replace(String arg0, String arg1, String arg2) {
                // TODO Auto-generated method stub
                return false;
            }
        }

Configuration

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
    xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-4.0.xsd
        http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd">

<bean id="ftpClientFactory"
        class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
        <property name="host" value="****"/>
        <property name="username" value="****"/>
        <property name="password" value="****"/>
        <property name="clientMode" value="0"/>
        <property name="fileType" value="2"/>
        <property name="bufferSize" value="100000"/>
    </bean>


    <int-ftp:inbound-channel-adapter id="ftpInbound" 
        channel="receiveChannel" session-factory="ftpClientFactory"
        auto-create-local-directory="true" delete-remote-files="false"
        remote-directory="/OUT/SDI402_CARATT_JD" remote-file-separator="/"
        filter="compositeFilter"
        local-directory="."
        local-filter="acceptAll">
        <int:poller fixed-rate="5000" max-messages-per-poll="1">
                  <!--   <int:transactional synchronization-factory="syncFactory"/> -->
                </int:poller>
    </int-ftp:inbound-channel-adapter>

    <int:channel id="receiveChannel" />
    <int:channel id="afterSuccessDeleteChannel" />
       <int:service-activator
        id="nonDeereXmlServiceActivator" input-channel="receiveChannel"
        ref="controllerListener" method="listen1" />

    <int:service-activator input-channel="receiveChannel" expression="T(java.lang.System).out.println(payload.toString())">
        <int:request-handler-advice-chain>
            <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                <property name="onSuccessExpression" value="payload.delete()" />
                <property name="successChannel" ref="afterSuccessDeleteChannel" />
                <property name="onFailureExpression" value="payload.rename('/tmp/bad/' + payload.name)" />
            </bean>
        </int:request-handler-advice-chain>
    </int:service-activator>

<!--    <int:transformer input-channel="afterSuccessDeleteChannel" output-channel="stdout"
        expression="'Removal of ' + inputMessage.payload.absolutePath + ' after transfer ' + (payload ? 'succeeded' : 'failed')" />

    <int-stream:stdout-channel-adapter id="stdout" append-newline="true"/>  -->

    <bean id="acceptAll" class="org.springframework.integration.file.filters.AcceptAllFileListFilter" />

    <bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
        <constructor-arg>
            <list>
                <bean class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter">
                    <!-- <constructor-arg value="*.xml" /> -->
                </bean>
                <bean class="org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter">
                    <constructor-arg name="store" ref="metadataStore"/>
                    <constructor-arg value="foo/bar/"/>
                </bean>
            </list>
        </constructor-arg>
    </bean>

    <bean name="metadataStore" class="com.deere.componentdatafiles.nondeerefile.MemcacheMetadatastore">
        <!-- <property name="baseDirectory" value="/tmp/"/> -->
    </bean>

</beans>
1

1 Answers

1
votes

You need to use a FtpPersistentAcceptOnceFileListFilter with some shared store, such as redis, mongo, zookeeper. See the documentation.