1
votes

My team is now stepping into the realm of Structured Streaming. I'm relatively new to Structured Streaming.

I have a requirement with

Source - CSV
Sink - JSON

Env Details:

Cluster: Spark 2.2.1
Programming Language: Scala
Build Tool: Gradle

Scope:

I have implemented this simple code

val schema = StructType(
    Array(StructField("customer_id", StringType),
        StructField("name", StringType),
        StructField("pid", StringType),
        StructField("product_name", StringType)))

val fileData = spark.readStream
    .option("header", "true")
    .schema(schema)
    .csv(args(0))

Then I apply a simple aggregation as

// The actual business logic is more complex than this
val customerCount = fileData.groupBy("customer_id").count()

Finally, write to JSON

val query = customerCount
    .writeStream
    .format("json")
    .option("path", "src/main/resources/output/myjson_dir")
    .option("checkpointLocation", "src/main/resources/chkpoint_dir")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .start()

Questions:

  1. This works as expected when I use .format("console"). But this throws an exception when I use .format("json") -

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;; Aggregate [customer_id#0], [customer_id#0, count(1) AS count#18L] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4b56b031,csv,List(),Some(StructType(StructField(customer_id,StringType,true), StructField(name,StringType,true), StructField(product_id,StringType,true), StructField(product_name,StringType,true))),List(),None,Map(header -> true, path -> /Users/Underwood/Documents/workspace/Spark_Streaming_Examples/src/main/resources/input),None), FileSource[/Users/Underwood/Documents/workspace/Spark_Streaming_Examples/src/main/resources/input], [customer_id#0, name#1, product_id#2, product_name#3, date#4]at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)

I have tried is other combinations of outputMode = "update" and outputMode = "complete". But these throw errors as well. Why is this so? Is this expected behavior? How do I write the output to a JSON sink?

  1. The above Exception talks about using watermarks. AFAIK, watermarks are used with Timestamp field, but I do not have a timestamp or date field in my input data. Please let me know if I'm wrong here. How will adding watermark make a difference here?

  2. My next attempt was to write a custom ForEachSink. I refered to this post. But this did not help me either. The problem here was, I was getting 200 directories with 0-byte file in each.

  3. How do I select non-group by columns in the final output? In a simple batch processing, I usually achieve this by joining the aggregated DF with the original DF and select the required rows. But Structured Streaming seems to not like this approach. Here's my sample code snippet

    val customerCount = fileData.groupBy("customer_id").count()
    val finalDF = fileData.join(customerCount, Seq("customer_id"))
        .select("customer_id", "count", "product_name" )
    

PLease let me know if I have missed out any detials.

1
i am facing same issue.. can you please provide your findings ?BigD
@BigD: This blog addresses similar issue.underwood

1 Answers

2
votes

Read the official Spark Structured Streaming documentation related to watermarks.

Basically, when you aggregate, you must set outputMode = "complete", because it doesn't make sense to append new data without keeping in memory the processing done before (the word count, for example).

Because of this, you must specify, using a watermark or a window function, when the program must start a new aggregation, and when the data is too late.

If you don't have a column with a timestamp, you could create one using now() function, and that would be the processing time.

If there is anything unclear or have questions, comment and I'll update my answer.