3
votes

I am new to Spring Integration.

I've configured a Spring file inbound-channel-adapter, e.g.

<file:inbound-channel-adapter channel="channel1" directory="${location}" prevent-duplicates="true" filename-pattern="*.csv">
        <si:poller>
            <si:interval-trigger interval="1000"/>
        </si:poller>
</file:inbound-channel-adapter>

<si:service-activator input-channel="channel1" output-channel="channel2" ref="filenameGenerator" method="generate"/>

Now this is working fine. But this needs to be deployed in a clustered environment. I want to make sure that multiple instances in the cluster do not attempt to read the same file. So will this work in such environment?

If no, can I use Quartz scheduler like this:

    <file:inbound-channel-adapter channel="channel1" directory="${location}" prevent-duplicates="true" filename-pattern="*.csv">
             <si:poller task-executor="taskExecutor" fixed-rate="1000"/>
    </file:inbound-channel-adapter>

    <si:service-activator input-channel="channel1" output-channel="channel2" ref="filenameGenerator" method="generate"/>

    <bean id="taskExecutor" class="org.springframework.scheduling.quartz.SimpleThreadPoolTaskExecutor">
        <property name="threadCount" value="20"/>
        <property name="threadNamePrefix" value="consumer"/>
    </bean>

Will this work and solve my problem?? Or do I have to use Transaction?

I hope the question is clear.

Thanks, Adi

2
To reiterate the problem, The issue is with the file inbound-channel-adapter in clustered environment. E.g when a file is placed in a folder, it should be picked up, processed, and finally rename it. In cluster, while one instance picks up a particular file, and still processing it, the other node's file adapter also picks up, and tries to process. Second adapter fails with a file not found exception, as the first adapter process and renames that in the meanwhile. So what can I do so that this does not happen????adi

2 Answers

2
votes

When multiple processes are reading from the same directory it can be desirable to lock files to prevent them from being picked up concurrently. To do this you can use a FileLocker

Check out the documentation around file lockers here. It seems that you can do soemthing like this:

<file:inbound-channel-adapter ... >
  <file:nio-locker/>
</file:inbound-channel-adapter>

When multiple processes are reading from the same directory it can be desirable to lock files to prevent them from being picked up concurrently. To do this you can use a FileLocker

1
votes

To ensure that a quartz-scheduled job executes once and only once within a cluster, configure a persistent, clustered quartz job schedule. Here's a sample config, for Quartz 1.6.6:

  <bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <!--        Set whether any jobs defined on this SchedulerFactoryBean should
            overwrite existing job definitions.
      --> 
    <property name="overwriteExistingJobs" value="true" /> 
  <property name="dataSource" ref="myTransactionalDataSource" /> 

<!-- nonTransactionalDataSource is only necessary with clustered Quartz with an XA DataSource.  
  --> 
  <property name="nonTransactionalDataSource" ref="myNonTransactionalDataSource" /> 

 <property name="quartzProperties">
  <props>
  <prop key="org.quartz.jobStore.selectWithLockSQL">SELECT * FROM {0}LOCKS WITH(UPDLOCK,HOLDLOCK) WHERE LOCK_NAME = ?</prop> 
  <!-- 
    Run in cluster.  Quartz ensures persisted jobs are executed once within the 
                      cluster
  --> 
  <prop key="org.quartz.jobStore.isClustered">true</prop> 

 <!--   Each node in the cluster must have a unique instance id.  
  --> 
  <prop key="org.quartz.scheduler.instanceId">AUTO</prop> 
 <!--   Default clusterCheckinInterval is 15000 
  --> 
  <!--  <prop key="org.quartz.jobStore.clusterCheckinInterval">20000</prop> 
  --> 
 </props>
  </property>
  <property name="transactionManager" ref="transactionManager" /> 
- <!-- 
        In Quartz 1.6.6, Quartz's ThreadPool interface is used when firing job triggers, 
        in org.quartz.core.QuartzSchedulerThread. 
        Quartz 1.x still starts some unmanaged threads, notably org.quartz.impl.jdbcjobstore.JobStoreSupport's
        ClusterManager which is used when clustered=true. Quartz 2.0 should correct this problem.       
  --> 
  <property name="taskExecutor" ref="myTaskExecutor" /> 
  </bean>