0
votes

I am trying to convert a file (csv.gz format) into parquet using streaming data frame. I have to use streaming data frames because the files compressed are ~700 MB in size. The job is run using a custom jar on AWS EMR. The source, destination and checkpoint locations are all on AWS S3. But as soon as I try to write to checkpoint the job fails with following error:

java.lang.IllegalArgumentException: 
Wrong FS: s3://my-bucket-name/transformData/checkpoints/sourceName/fileType/metadata,
expected: hdfs://ip-<ip_address>.us-west-2.compute.internal:8020

There are other spark jobs running on the EMR cluster that read and write from and to S3 which run successfully (but they are not using spark streaming). So I do not think it is an issue with S3 file system access as suggested in this post. I also looked at this question but the answers do not help in my case. I am using Scala: 2.11.8 and Spark: 2.1.0. Following is the code I have so far

...

    val spark = conf match {
      case null =>
        SparkSession
          .builder()
          .appName(this.getClass.toString)
          .getOrCreate()
      case _ =>
        SparkSession
          .builder()
          .config(conf)
          .getOrCreate()
    }

    // Read CSV file into structured streaming dataframe
    val streamingDF = spark.readStream
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("timestampFormat", "dd-MMM-yyyy HH:mm:ss")
      .option("treatEmptyValuesAsNulls", "true")
      .option("nullValue","")
      .schema(schema)
      .load(s"s3://my-bucket-name/rawData/sourceName/fileType/*/*/fileNamePrefix*")
      .withColumn("event_date", "event_datetime".cast("date"))
      .withColumn("event_year", year($"event_date"))
      .withColumn("event_month", month($"event_date"))

    // Write the results to Parquet
    streamingDF.writeStream
      .format("parquet")
      .option("path", "s3://my-bucket-name/transformedData/sourceName/fileType/")
      .option("compression", "gzip")
      .option("checkpointLocation", "s3://my-bucket-name/transformedData/checkpoints/sourceName/fileType/")
      .partitionBy("event_year", "event_month")
      .trigger(ProcessingTime("900 seconds"))
      .start()

I have also tried to use s3n:// instead of s3:// in the URI but that does not seem to have any effect.

1

1 Answers

0
votes

Tl;dr Upgrade spark or avoid using s3 as checkpoint location

Apache Spark (Structured Streaming) : S3 Checkpoint support

Also you should probably specify the write path with s3a://

A successor to the S3 Native, s3n:// filesystem, the S3a: system uses Amazon's libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for/successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.

https://wiki.apache.org/hadoop/AmazonS3