I am trying to convert a file (csv.gz format) into parquet using streaming data frame. I have to use streaming data frames because the files compressed are ~700 MB in size. The job is run using a custom jar on AWS EMR. The source, destination and checkpoint locations are all on AWS S3. But as soon as I try to write to checkpoint the job fails with following error:
java.lang.IllegalArgumentException:
Wrong FS: s3://my-bucket-name/transformData/checkpoints/sourceName/fileType/metadata,
expected: hdfs://ip-<ip_address>.us-west-2.compute.internal:8020
There are other spark jobs running on the EMR cluster that read and write from and to S3 which run successfully (but they are not using spark streaming). So I do not think it is an issue with S3 file system access as suggested in this post. I also looked at this question but the answers do not help in my case. I am using Scala: 2.11.8 and Spark: 2.1.0. Following is the code I have so far
...
val spark = conf match {
case null =>
SparkSession
.builder()
.appName(this.getClass.toString)
.getOrCreate()
case _ =>
SparkSession
.builder()
.config(conf)
.getOrCreate()
}
// Read CSV file into structured streaming dataframe
val streamingDF = spark.readStream
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter","|")
.option("timestampFormat", "dd-MMM-yyyy HH:mm:ss")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue","")
.schema(schema)
.load(s"s3://my-bucket-name/rawData/sourceName/fileType/*/*/fileNamePrefix*")
.withColumn("event_date", "event_datetime".cast("date"))
.withColumn("event_year", year($"event_date"))
.withColumn("event_month", month($"event_date"))
// Write the results to Parquet
streamingDF.writeStream
.format("parquet")
.option("path", "s3://my-bucket-name/transformedData/sourceName/fileType/")
.option("compression", "gzip")
.option("checkpointLocation", "s3://my-bucket-name/transformedData/checkpoints/sourceName/fileType/")
.partitionBy("event_year", "event_month")
.trigger(ProcessingTime("900 seconds"))
.start()
I have also tried to use s3n:// instead of s3:// in the URI but that does not seem to have any effect.