When writing software to read files from an HDFS filesystem that are of different types (e.g. some are tar archives, some are plain text, some are binary, and some are gzip or other compression), I find that I can use the CompressorStreamFactory.detect method to correctly identify gzipped files as expected, I get an error when I try to create a BufferedReader to read the compressed data line by line.
I have tried the following:
val hdfsConf: Configuration = new Configuration()
hdfsConf.set("fs.hdfs.impl",classOf[DistributedFileSystem].getName)
hdfsConf.set("fs.file.impl",classOf[LocalFileSystem].getName)
hdfsConf.set(FileSystem.DEFAULT_FS,"hdfs://a-namenode-that-exists:8020")
val fs: FileSystem = FileSystem.get(new URI("hdfs://a-namenode-that-exists:8020"),hdfsConf)
def getFiles(directory: Path): Unit = {
val iter: RemoteIterator[LocatedFileStatus] = fs.listFiles(directory,true)
var c: Int = 0
while(iter.hasNext && c < 3) {
val fileStatus: LocatedFileStatus = iter.next()
val path: Path = fileStatus.getPath
val hfs: FileSystem = path.getFileSystem(hdfsConf)
val is: FSDataInputStream = hfs.open(path)
val t: String = CompressorStreamFactory.detect(new BufferedInputStream(is))
System.out.println(s"|||||||||| $t |=| ${path.toString}")
val reader: BufferedReader = new BufferedReader(new InputStreamReader(new CompressorStreamFactory().createCompressorInputStream(new BufferedInputStream(is))))
var cc: Int = 0
while(cc < 10) {
val line: String = reader.readLine()
System.out.println(s"|||||||||| $line")
cc += 1
}
c += 1
}
}
getFiles(new Path("/some/directory/that/is/definitely/there/"))
I expected that since I was able to successfully use the CompressorStreamFactory.detect method to correctly identify my file as gzip, that reading the file should work too. The resulting class returned by hdfs open method is FSDataInputStream, which is derived from InputStream (and FilteredInputStream), so, I expected that since I have used the Apache Commons Compress library extensively for reading archived and compressed files from a regular Linux filesystem and that I can identify the compression in use, that it should work fine with HDFS as well...but alas, I get an error:
Exception in thread "main" org.apache.commons.compress.compressors.CompressorException: No Compressor found for the stream signature.
at org.apache.commons.compress.compressors.CompressorStreamFactory.detect(CompressorStreamFactory.java:525)
at org.apache.commons.compress.compressors.CompressorStreamFactory.createCompressorInputStream(CompressorStreamFactory.java:542)
at myorg.myproject.scratch.HdfsTest$.getFiles$1(HdfsTest.scala:34)
at myorg.myproject.scratch.HdfsTest$.main(HdfsTest.scala:47)
at myorg.myproject.scratch.HdfsTest.main(HdfsTest.scala)
I am quite attached to the Apache Commons library because its factory methods reduce the complexity of my code for reading archived and compressed files (not just tar or gzip). I am hoping that there is a simple explanation for why detection works, but reading does not...but I have run out of ideas for how to figure this out. The only thought I have is that the FilteredInputStream origins of FSDataInputStream might be mucking things up...but I have no idea how that could be fixed were it the actual problem.