I'm currently running PySpark via local mode. I want to be able to efficiently output parquet files to S3 via the S3 Directory Committer. This PySpark instance is using the local disk, not HDFS, as it is being submitted via spark-submit --master local[*].
I can successfully write to my S3 Instance without enabling the directory committer. However, this involves writing staging files to S3 and renaming them, which is slow and unreliable. I would like for Spark to write to my local filesystem as a temporary store, and then copy to S3.
I have the following configuration in my PySpark conf:
self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
self.spark.conf.set("spark.hadoop.fs.s3a.committer.name", "directory")
self.spark.conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
self.spark.conf.set("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
My spark-submit command looks like this:
spark-submit --master local[*] --py-files files.zip --packages com.amazonaws:aws-java-sdk:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0,org.apache.spark.internal.io.cloud.PathOutputCommitProtocol --driver-memory 4G --name clean-raw-recording_data main.py
spark-submit gives me the following error, due to the requisite JAR not being in place:
java.lang.ClassNotFoundException: org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
My questions are:
- Which JAR (specifically, the maven coordinates) do I need to include in
spark-submit --packagesin order to be able to referencePathOutputCommitProtocol? - Once I have (1) working, will I be able to use PySpark's local mode to stage temporary files on the local filesystem? Or is HDFS a strict requirement?
I need this to be running in local mode, not cluster mode.