1
votes

I am new to Spring Integration and have an application that accepts files as input and does some specific processing on those files depending on the contents of each file.

When the file comes in, I have a router that decides whether is type "A", "B" or a "ZIP" file and routes it to the appropriate channel. In the case of a ZIP file I have a splitter which unzips the zip file and sends the results back to the router to determine which type of file from the zip is - A, B, or a nested ZIP file.

There is also an error-channel specified that handles file parsing errors.

The relevant part of the XML looks something like this:

<?xml version="1.0" encoding="UTF-8"?>

...
...

<int:channel id="checkFile" />
<int:channel id="A" />
<int:channel id="B" />
<int:channel id="ZIP" />

<!-- Route the file to the appropriate parser - A, B, or ZIP -->
<int:router input-channel="checkFile" ref="fileTypeIdentifier" method="identifyType" />

<!-- If we have a ZIP file, extract the contents and process each one individually -->
<int:splitter input-channel="ZIP" output-channel="checkFile" ref="extractor" />

<!-- Parse file type A -->
<int:chain input-channel="A" output-channel="Done">
    ...
    ...
</int:chain>

<!-- Parse file type B -->
<int:chain input-channel="B" output-channel="Done">
    ...
    ...
</int:chain>

...
...

If my input file is myfile.zip which contains file1.txt and file2.txt, the router correctly routes myfile.zip to the splitter which successfully extracts file1.txt and file2.txt (and returns those 2 files).

If I have an error parsing file1.txt I would still like to try to parse file2.txt. So after reading Spring Integration: Splitter exception causes subsequent messages to abort I tried making my "checkFile" channel a queue channel, now what happens is I pass in myfile.zip which gets routed to the "ZIP" channel, which successfully returns "file1.txt" and "file2.txt", but then the router keeps getting called with "myfile.zip" in an endless loop.

A similar problem results from using an executor channel instead of a queue channel.

How can I get my flow to unzip files and process their contents individually, while allowing one or more of the files inside the zip to fail without failing the entire job?

1
Your design is correct. There is something else in your application which makes that weird behavior. If your extractor doesn't return the original file it can't go to the checkFile. For example you don't show a <poller> configuration.Artem Bilan

1 Answers

1
votes

It might take a lot of place here, but I'll try to post it anyway:

<bean id="fileTypeIdentifier" class="org.springframework.integration.splitter.So43527224Tests$FileTypeIdentifier"/>

<bean id="fileParser" class="org.springframework.integration.splitter.So43527224Tests$FileParser"/>

<int:channel id="checkFile">
    <int:queue/>
</int:channel>

<!-- Route the file to the appropriate parser - A, B, or ZIP -->
<int:router input-channel="checkFile" ref="fileTypeIdentifier" method="identifyType">
    <int:poller fixed-delay="1000"/>
</int:router>

<!-- If we have a ZIP file, extract the contents and process each one individually -->
<int:splitter input-channel="ZIP" output-channel="checkFile"/>

<!-- Parse file type A -->
<int:service-activator input-channel="A" output-channel="nullChannel"
                       ref="fileParser" method="parseFileA"/>

<!-- Parse file type B -->
<int:service-activator input-channel="B" output-channel="nullChannel"
                       ref="fileParser" method="parseFileB"/>

And test class:

@RunWith(SpringRunner.class)
@DirtiesContext
public class So43527224Tests {

    @Autowired
    private MessageChannel checkFile;

    @Test
    public void testSo43527224() throws InterruptedException {
        this.checkFile.send(new GenericMessage<>(2));
        this.checkFile.send(new GenericMessage<>(3));
        this.checkFile.send(new GenericMessage<>(Collections.singletonList(4)));
        this.checkFile.send(new GenericMessage<>(Collections.singletonList(5)));

        List<Object> zip = new ArrayList<>();

        zip.add(6);
        zip.add(7);

        List<Integer> nestedZip = new ArrayList<>();
        nestedZip.add(9);
        nestedZip.add(8);
        zip.add(nestedZip);

        this.checkFile.send(new GenericMessage<>(zip));

        Thread.sleep(10000);
    }

    private static class FileTypeIdentifier {

        public String identifyType(Object payload) {
            if (payload instanceof List) {
                System.out.println("ZIP: " + payload);
                return "ZIP";
            }
            else if (Integer.class.cast(payload) % 2 == 0) {
                return "A";
            }
            else {
                return "B";
            }
        }

    }

    private static class FileParser {

        public String parseFileA(Object payload) {
            throw new RuntimeException("intentional: A is always bad: " + payload);
        }

        public void parseFileB(Object payload) {
            System.out.println("Good file B: " + payload);
        }

    }

}

The logs looks like this (without stack traces):

Caused by: java.lang.RuntimeException: intentional: A is always bad: 2
Good file B: 3
ZIP: [4]
ZIP: [5]
ZIP: [6, 7, [9, 8]]
Caused by: java.lang.RuntimeException: intentional: A is always bad: 4
Good file B: 5
Caused by: java.lang.RuntimeException: intentional: A is always bad: 6
Good file B: 7
ZIP: [9, 8]
Good file B: 9
Caused by: java.lang.RuntimeException: intentional: A is always bad: 8

So, as I said before: there is something else not show here to determine the reason of the loop.

You always can switch on DEBUG for the org.springframework.integration category to analyze how your messages are travel in the flow.