0
votes

I need to read and processes a specific file within a zip archive in Apache Flink.

In the documentation, I found that

Flink currently supports transparent decompression of input files if these are marked with an appropriate file extension.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/batch/#read-compressed-files

Is it possible process it while decompressing on the fly in Apache Flink?

2

2 Answers

1
votes

The FileInputFormat will delegate the reading compressed files to GZIPInputStream, which will return partial decompressed data while decompressing.

0
votes

I want to share the solution I implemented meanwhile.

So, after created my own InputFormat I used the following code within the open() method:

@Override
public void open(final FileInputSplit ignored) throws IOException {
    ...
    final XMLInputFactory xmlif = XMLInputFactory.newInstance();
    final XMLStreamReader xmlr = xmlif.createXMLStreamReader(filePath.toString(),
              InputFormatUtil.readFileWithinZipArchive(filePath, nestedXmlFileName));
    while (xmlr.hasNext()) {
    ...
}

where the implementation of readFileWithinZipArchive(...) is:

public static InputStream readFileWithinZipArchive(final Path zipPath, final String filename) throws IOException {
    // using org.apache.flink.core.fs.Path for getting the InputStream from the (remote) zip archive
    final InputStream zipInputStream = zipPath.getFileSystem().open(zipPath);
    // generating a temporary local copy of the zip file
    final File tmpFile = stream2file(zipInputStream);
    // then using java.util.zip.ZipFile for extracting the InputStream for the specific file within the zip archive
    final ZipFile zipFile = new ZipFile(tmpFile);
    return zipFile.getInputStream(zipFile.getEntry(filename));
}