5
votes

I have many parquet file directories on HDFS that contain a few thousands of small(most < 100kb) parquet files each. They slow down my Spark job, so I want to combine them.

With the following code I can repartition the local parquet file to smaller number of parts:

val pqFile = sqlContext.read.parquet("file:/home/hadoop/data/file.parquet")
pqFile.coalesce(4).write.save("file:/home/hadoop/data/fileSmaller.parquet")

But I don't know how to get the size of a directory on HDFS through Scala code programmatically, hence I can't work out the number of partitions to pass to the coalesce function for the real data set.

How can I do this? Or is there a convenient way within Spark so that I can configure the writer to write fixed size of parquet partition?

1

1 Answers

6
votes

You could try

pqFile.inputFiles.size

which returns "a best-effort snapshot of the files that compose this DataFrame" according to the documentation.

As an alternative, directly on the HDFS level:

val hdfs: org.apache.hadoop.fs.FileSystem =
  org.apache.hadoop.fs.FileSystem.get(
    new org.apache.hadoop.conf.Configuration())

val hadoopPath= new org.apache.hadoop.fs.Path("hdfs://localhost:9000/tmp")
val recursive = false
val ri = hdfs.listFiles(hadoopPath, recursive)
val it = new Iterator[org.apache.hadoop.fs.LocatedFileStatus]() {
  override def hasNext = ri.hasNext
  override def next() = ri.next()
}

// Materialize iterator
val files = it.toList
println(files.size)
println(files.map(_.getLen).sum)

This way you get the file sizes as well.