0
votes

I'm currently building an application with Apache Spark (pyspark), and I have the following use case:

  • Run pyspark with local mode (using spark-submit local[*]).
  • Write the results of my spark job to S3 in the form of partitioned Parquet files.
  • Ensure that each job overwrite the particular partition it is writing to, in order to ensure idempotent jobs.
  • Ensure that spark-staging files are written to local disk before being committed to S3, as staging in S3, and then committing via a rename operation, is very expensive.

For various internal reasons, all four of the above bullet points are non-negotiable.

I have everything but the last bullet point working. I'm running a pyspark application, and writing to S3 (actually an on-prem Ceph instance), ensuring that spark.sql.sources.partitionOverwriteMode is set to dynamic.

However, this means that my spark-staging files are being staged in S3, and then committed by using a delete-and-rename operation, which is very expensive.

I've tried using the Spark Directory Committer in order to stage files on my local disk. This works great unless spark.sql.sources.partitionOverwriteMode.

After digging through the source code, it looks like the PathOutputCommitter does not support Dynamic Partition Overwriting.

At this point, I'm stuck. I want to be able to write my staging files to local disk, and then commit the results to S3. However, I also need to be able to dynamically overwrite a single partition without overwriting the entire Parquet table.

For reference, I'm running pyspark=3.1.2, and using the following spark-submit command:

spark-submit --repositories https://repository.cloudera.com/artifactory/cloudera-repos/ --packages com.amazonaws:aws-java-sdk:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0,org.apache.spark:spark-hadoop-cloud_2.12:3.1.1.3.1.7270.0-253

I get the following error when spark.sql.sources.partitionOverwriteMode is set to dynamic:

java.io.IOException: PathOutputCommitProtocol does not support dynamicPartitionOverwrite

My spark config is as follows:


        self.spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
        self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

        self.spark.conf.set("spark.hadoop.fs.s3a.committer.name", "magic")

        self.spark.conf.set("spark.sql.sources.commitProtocolClass",
                            "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")

        self.spark.conf.set("spark.sql.parquet.output.committer.class",
                            "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")

        self.spark.conf.set(
            "spark.sql.sources.partitionOverwriteMode", "dynamic"
        )