I want to share the solution I implemented meanwhile.
So, after created my own InputFormat
I used the following code within the open()
method:
@Override
public void open(final FileInputSplit ignored) throws IOException {
...
final XMLInputFactory xmlif = XMLInputFactory.newInstance();
final XMLStreamReader xmlr = xmlif.createXMLStreamReader(filePath.toString(),
InputFormatUtil.readFileWithinZipArchive(filePath, nestedXmlFileName));
while (xmlr.hasNext()) {
...
}
where the implementation of readFileWithinZipArchive(...)
is:
public static InputStream readFileWithinZipArchive(final Path zipPath, final String filename) throws IOException {
// using org.apache.flink.core.fs.Path for getting the InputStream from the (remote) zip archive
final InputStream zipInputStream = zipPath.getFileSystem().open(zipPath);
// generating a temporary local copy of the zip file
final File tmpFile = stream2file(zipInputStream);
// then using java.util.zip.ZipFile for extracting the InputStream for the specific file within the zip archive
final ZipFile zipFile = new ZipFile(tmpFile);
return zipFile.getInputStream(zipFile.getEntry(filename));
}