2
votes

I have written a code to combined multiple files into one single Master file. The issue is with int-transformer where I am getting one file at a time although I have aggregated List of File in composite Filter of File inbound-channel-adapter. The List of File size in composite filter is correct but in Transformer bean the List of File size is always one and not getting the correct list size aggregated file by the filter.

Here is my config:

<!-- Auto Wiring -->
<context:component-scan base-package="com.nt.na21.nam.integration.*" />
<!-- intercept and log every message -->
<int:logging-channel-adapter id="logger"
    level="DEBUG" />
<int:wire-tap channel="logger" />

<!-- Aggregating the processed Output for OSS processing -->

<int:channel id="networkData" />
<int:channel id="requests" />

<int-file:inbound-channel-adapter id="pollProcessedNetworkData"
    directory="file:${processing.files.directory}" filter="compositeProcessedFileFilter"
    channel="networkData">
    <int:poller default="true" cron="*/20 * * * * *" />

</int-file:inbound-channel-adapter>

<bean id="compositeProcessedFileFilter"
    class="com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine" />

<int:transformer id="aggregateNetworkData"
    input-channel="networkData" output-channel="requests">
    <bean id="networkData" class="com.nt.na21.nam.integration.helper.CSVFileAggregator">
    </bean>
</int:transformer>

CompositeFileListFilterForBaseLine:

public class CompositeFileListFilterForBaseLine implements FileListFilter<File> {

    private final static Logger LOG = Logger
            .getLogger(CompositeFileListFilterForBaseLine.class);

    @Override
    public List<File> filterFiles(File[] files) {
        List<File> filteredFile = new ArrayList<File>();
        int index;
        String fetchedFileName = null;
        String fileCreatedDate = null;
        String todayDate = DateHelper.toddMM(new Date());
        LOG.debug("Date - dd-MM: " + todayDate);

        for (File f : files) {
            fetchedFileName = StringUtils.removeEnd(f.getName(), ".csv");
            index = fetchedFileName.indexOf("_");

            // Add plus one to index to skip underscore
            fileCreatedDate = fetchedFileName.substring(index + 1);
            // Format the created file date
            fileCreatedDate = DateHelper.formatFileNameDateForAggregation(fileCreatedDate);
            LOG.debug("file created date: " + fileCreatedDate + " today Date: "
                    + todayDate);
            if (fileCreatedDate.equalsIgnoreCase(todayDate)) {
                filteredFile.add(f);
                LOG.debug("File added to List of File: " + f.getAbsolutePath());
            }
        }
        LOG.debug("SIZE: " + filteredFile.size());
        LOG.debug("filterFiles method end.");
        return filteredFile;
    }

}

The Class file for CSVFileAggregator

public class CSVFileAggregator {

    private final static Logger LOG = Logger.getLogger(CSVFileAggregator.class);

    private int snePostion;

    protected String masterFileSourcePath=null;

    public File handleAggregateFiles(List<File> files) throws IOException {
        LOG.debug("materFileSourcePath: " + masterFileSourcePath);
        LinkedHashSet<String> allAttributes = null;
        Map<String, LinkedHashSet<String>> allAttrBase = null;
        Map<String, LinkedHashSet<String>> allAttrDelta = null;
        LOG.info("Aggregator releasing [" + files.size() + "] files");
    }
}

Log Output:

INFO : com.nt.na21.nam.integration.aggregator.NetFileAggregatorClient - NetFileAggregator context initialized. Polling input folder...
INFO : com.nt.na21.nam.integration.aggregator.NetFileAggregatorClient - Input directory is: D:\Projects\csv\processing
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - Date - dd-MM: 0103
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - file created date: 0103 today Date: 0103
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - File added to List of File: D:\Projects\NA21\NAMworkspace\na21_nam_integration\csv\processing\file1_base_0103.csv
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - file created date: 0103 today Date: 0103
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - File added to List of File: D:\Projects\NA21\NAMworkspace\na21_nam_integration\csv\processing\file2_base_0103.csv
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - **SIZE: 2**
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - filterFiles method end.
DEBUG: org.springframework.integration.file.FileReadingMessageSource - Added to queue: [csv\processing\file1_base_0103.csv, csv\processing\file2_base_0103.csv]
INFO : org.springframework.integration.file.FileReadingMessageSource - Created message: [GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]]
DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Poll resulted in Message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: org.springframework.integration.channel.DirectChannel - preSend on channel 'networkData', message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: org.springframework.integration.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: org.springframework.integration.handler.LoggingHandler - csv\processing\file2_base_0103.csv
DEBUG: org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'logger', message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler@606f8b2b received message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: com.nt.na21.nam.integration.helper.CSVFileAggregator - materFileSourcePath: null
INFO : com.nt.na21.nam.integration.helper.CSVFileAggregator - **Aggregator releasing [1] files**

Can some one help me here in identifying the issue with Filter and same is not collecting for transformation?

Thanks in advance.

The issue is with int:aggregator as I am not sure how to invoke. I have used this earlier in my design but it didn't get executed at all. Thanks for the quick response.

For this problem I have written a FileScaner utility which will scan all the files in Folder inside and aggregation is working perfectly.

Please find the config with Aggregator which didn't works, hence I splited the design by two poller first produced all the CSV file(s) and second collect it and aggregate it.

<!-- Auto Wiring -->
<context:component-scan base-package="com.bt.na21.nam.integration.*" />
<!-- intercept and log every message -->
<int:logging-channel-adapter id="logger" level="DEBUG" />
<int:wire-tap channel = "logger" />

<int:channel id="fileInputChannel" datatype="java.io.File" />
<int:channel id="error" />
<int:channel id="requestsCSVInput" />

<int-file:inbound-channel-adapter id="pollNetworkFile"
    directory="file:${input.files.directory}" channel="fileInputChannel"
    filter="compositeFileFilter" prevent-duplicates="true">
    <int:poller default="true" cron="*/20 * * * * *"
        error-channel="error" />
</int-file:inbound-channel-adapter>

<bean id="compositeFileFilter"
    class="com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForTodayFiles" />

<int:transformer id="transformInputZipCSVFileIntoCSV"
    input-channel="fileInputChannel" output-channel="requestsCSVInput">
    <bean id="transformZipFile"
        class="com.nt.na21.nam.integration.file.net.NetRecordFileTransformation" />
</int:transformer>

<int:router ref="docTypeRouter" input-channel="requestsCSVInput"
    method="resolveObjectTypeChannel">
</int:router>

<int:channel id="Vlan" />
<int:channel id="VlanShaper" />
<int:channel id="TdmPwe" />

<bean id="docTypeRouter"
    class="com.nt.na21.nam.integration.file.net.DocumentTypeMessageRouter" />

<int:service-activator ref="vLanMessageHandler" output-channel="newContentItemNotification" input-channel="Vlan" method="handleFile" />

<bean id="vLanMessageHandler" class="com.nt.na21.nam.integration.file.handler.VLanRecordsHandler" />

<int:service-activator ref="VlanShaperMessageHandler" output-channel="newContentItemNotification" input-channel="VlanShaper" method="handleFile" />

<bean id="VlanShaperMessageHandler" class="com.nt.na21.nam.integration.file.handler.VlanShaperRecordsHandler" />

<int:service-activator ref="PweMessageHandler" output-channel="newContentItemNotification" input-channel="TdmPwe" method="handleFile" />

<bean id="PweMessageHandler" class="com.nt.na21.nam.integration.file.handler.PseudoWireRecordsHandler" />


<int:channel id="newContentItemNotification" />

<!-- Adding for aggregating the records in one place for OSS output -->

 <int:aggregator input-channel="newContentItemNotification" method="aggregate"
    ref="netRecordsResultAggregator" output-channel="net-records-aggregated-reply"
    message-store="netRecordsResultMessageStore"
    send-partial-result-on-expiry="true">
 </int:aggregator>

 <int:channel id="net-records-aggregated-reply" />
 <bean id="netRecordsResultAggregator" class="com.nt.na21.nam.integration.aggregator.NetRecordsResultAggregator" />

 <!-- Define a store for our network records results and set up a reaper that will
    periodically expire those results. -->
<bean id="netRecordsResultMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />


 <int-file:outbound-channel-adapter id="filesOut"
                               directory="file:${output.files.directory}" 
                               delete-source-files="true">

 </int-file:outbound-channel-adapter>

The code is working fine till the routed to all the channel below:

<int:channel id="Vlan" />
<int:channel id="VlanShaper" />
<int:channel id="TdmPwe" />

I am trying to return LinkedHashSet from the Process of the above channel which contains CSV data and I need to aggregate all the merge

LinkedHashSet vAllAttributes to get the master output CSV file.
List<String> masterList = new ArrayList<String>(vAllAttributes);
Collections.sort(masterList);
1

1 Answers

2
votes

Well, looks like you misunderstood a bit <int-file:inbound-channel-adapter> behaviour. Its nature is producing one file per message to the channel. It doesn't depend on the logic of the FileListFilter. The is like:

  1. The FileReadingMessageSource uses DirectoryScanner to retrieve files from the provided directory to an internal toBeReceived Queue

  2. Since we scan the directory for the files the design for the DirectoryScanner looks like List<File> listFiles(File directory). I guess this has led you astray.

  3. After that the filter is applied to the original file list and returns only appropriate files.

  4. They are stored to the toBeReceived Queue.

  5. And only after that the FileReadingMessageSource polls an item from the queue to build message for the output channel.

To achieve your aggregation requirements you really should use an <aggregator> between <int-file:inbound-channel-adapter> and your <int:transformer>.

You can mark the <poller> of the <int-file:inbound-channel-adapter> with max-messages-per-poll="-1" to really poll all your files during the single scheduled task. But anyway there will as much messages as your filter returns files.

After that you must accept some tricks for the <aggregator>:

  1. correlationKey - to allow your file messages to be combined to the single MessageGroup for release a single message for the further <transformer>. Since we don't have any context from <int-file:inbound-channel-adapter>, but we know that all messages are provided by the single polling task and withing scheduled Thread (you don't use task-executor on the <poller>), hence we can simply use correlationKey as:

    correlation-strategy-expression="T(Thread).currentThread().id"
    
  2. But the is not enough, because we should produce somehow the single message in the end anyway. Unfortunately we don't know the number of files (however you can do that via the ThreadLocal from your custom FileListFilter) to allow the ReleaseStrategy to return true for the aggregate phase. Hence we never have the normal group completion. But we can forceRelease uncompleted groups from the aggregator to use the MessageGroupStoreReaper or group-timeout on the <aggregator>.

  3. In addition to the previous clause you should supply these options on the <aggegator>:

    send-partial-result-on-expiry="true"
    expire-groups-upon-completion="true"
    

And that's all. There is no reason to provide any custom aggregation function (ref/method or expression), because the default on just build a single message with the List of payloads from all messages in group. And that is appropriate for your CSVFileAggregator. Although you can avoid that <transformer> and this CSVFileAggregator for the aggregation function.

Hope I ma clear