0
votes

My Spark job reads a folder with parquet data partitioned by the column partition:

    val spark = SparkSession
      .builder()
      .appName("Prepare Id Mapping")
      .getOrCreate()
    import spark.implicits._

    spark.read
      .parquet(sourceDir)
      .filter($"field" === "ss_id" and $"int_value".isNotNull)
      .select($"int_value".as("ss_id"), $"partition".as("date"), $"ct_id")
      .coalesce(1)
      .write
      .partitionBy("date")
      .parquet(idMappingDir)

I've noticed that only one task is created so it's very slow. There is a lot of subfolders like partition=2019-01-07 inside the source folder, and each subfolder contains a lot of files with the extension snappy.parquet. I submit the job --num-executors 2 --executor-cores 4, and RAM is not an issue. I tried reading from both S3 and the local filesystem. I tried adding .repartition(nPartitions), removing .coalesce(1) and .partitionBy("date") but the same.

Could you suggest how I can get Spark read these parquet files in parallel?

1
nothing looks odd. try reading sourceDir/2019-01-07 if you see a difference - Salim
@Salim It would helped in my case because I needed the partition column in the dataframe. - BJ_
good find on 'merge schema'. Please change your question to add this concern that each partition may have a different schema to help others find answer. - Salim

1 Answers

0
votes

Well, I've figured out the correct code:

    val spark = SparkSession
      .builder()
      .appName("Prepare Id Mapping")
      .getOrCreate()
    import spark.implicits._

    spark.read
      .option("mergeSchema", "true")
      .parquet(sourceDir)
      .filter($"field" === "ss_id" and $"int_value".isNotNull)
      .select($"int_value".as("ss_id"), $"partition".as("date"), $"ct_id")
      .write
      .partitionBy("date")
      .parquet(idMappingDir)

Hope this will save someone time in future.