1
votes

I want to read multiple parquet files from a folder which also contains some other file types(csv,avro) into a dataframe. I want to read only if its parquet and skip and go to next if any other. The problem is parquet file might not have extension and codec might also vary from file to file. In Spark-scala is there a way to do this?

1
What have you done so far? Have you tried spark.read.parquet(path to file)? What did it do? Please see stackoverflow.com/help/how-to-ask and stackoverflow.com/help/mcveGMc
Even if you use spark.read.parquet(path), it will give you an exception when in encounters any other file type. The best way is to try to modify the flow to make sure every file has an extension, and iterate through the files and use filter to only read parquet files. If you can't do that, do the same, iterate through the files and use try-catch to skip the ones that give an exception, like in my answer to this question: stackoverflow.com/a/51042091/3000244Shikkou
Not sure what exactly you looking for, but you can try this spark.read.parquet("/foldername/*/*/*.parquet") here * and * replaced by nested folder if you have more than two nested folder then add another * like this /*/*/*/*.parquetYogesh
Giving extension is not possible in my case, even after handling exception program fails at line where trying to read parquet file when any other type encountered java.io.IOException: Could not read footer for file:Prachi
So in this case you should iterate all path check those file which ends with (.endsWith("parquet")) parquet and add those path in one list/array/set's then used spark read. for ex. if list path having all path with parquet file then used this code val path=List("path1","path2",....,"pathn"); spark.read.parquet(path:_*)Yogesh

1 Answers

0
votes

You can get the filenames beforehand in the following way:

improt org.apache.spark.sql.DataFrame
import scala.sys.process._

val fileNames: List[String] = "hdfs dfs -ls /path/to/files/on/hdfs".!!
                             .split("\n")
                             .filter(_.endsWith(".parquet"))
                             .map(_.split("\\s").last).toList

val df: DataFrame = spark.read.parquet(fileNames:_*)

spark in the above code is the SparkSession object. This code should work for Spark 1.x versions as well since the method signature for parquet() is the same in Spark 1.x and Spark 2.x versions.