3
votes

Followup to this question

I have json streaming data in the format same as below

|  A    | B                                        |
|-------|------------------------------------------|
|  ABC  |  [{C:1, D:1}, {C:2, D:4}]                | 
|  XYZ  |  [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |

I need to transform it to the format below

|   A   |  C  |  D   |
|-------|-----|------|
|  ABC  |  1  |  1   |
|  ABC  |  2  |  4   |
|  XYZ  |  3  |  6   |
|  XYZ  |  9  |  11  |
|  XYZ  |  5  |  12  | 

To achieve this performed the transformations as suggested to the previous question.

val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")

val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum") 

val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")

val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))

val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C")) 

val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")

Now I trying to save the result to a csv file in HDFS

df6.withWatermark("event_time", "0 seconds")
  .writeStream
  .trigger(Trigger.ProcessingTime("0 seconds"))
  .queryName("query_db")
  .format("parquet")
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("path", "/path/to/output")
  //      .outputMode("complete")
  .start()

Now I get the below error.

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;; EventTimeWatermark event_time#223: timestamp, interval

My doubt is that I am not performing any aggregation that will require it store the aggregated value beyond the processing time for that row. Why do I get this error? Can I keep watermarking as 0 seconds?

Any help on this will be deeply appreciated.

1

1 Answers

0
votes

As per my understanding, watermarking is required only when you are performing window operation on event time. Spark used watermarking to handle late data and for the same purpose Spark needs to save older aggregation.

The following link explains this very well with example: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

I don't see any window operations in your transformation and if that is the case then I think you can try running the stream query without watermarking.