0
votes

I am getting below error in below code snippet -

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

Below is my input schema

val schema = new StructType()
.add("product",StringType)
.add("org",StringType)
.add("quantity", IntegerType)
.add("booked_at",TimestampType)

Creating streaming source dataset

val payload_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as payload")
.select(from_json(col("payload"), schema).as("data"))
.select("data.*")

Creating another streaming dataframe where aggregation is done and then joining it with original source dataframe to filter out records

payload_df.createOrReplaceTempView("orders")
    val stage_df = spark.sql("select org, product, max(booked_at) as booked_at from orders group by 1,2")
  stage_df.createOrReplaceTempView("stage")

  val total_qty = spark.sql(
    "select o.* from orders o join stage s on o.org = s.org and o.product = s.product and o.booked_at > s.booked_at ")

Finally, I was trying to display results on console with Append output mode. I am not able to figure out where I need to add watermark or how to resolve this. My objective is to filter out only those events in every trigger which have higher timestamp then the maximum timestamp received in any of the earlier triggers

total_qty
    .writeStream
    .format("console")
    .outputMode("append")
    .start()
    .awaitTermination()
1

1 Answers

0
votes

With spark structured streaming you can make aggregation directly on stream only with watermark. If you have a column with the timestamp of the event you can do it like this:

val payload_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as payload")
.select(from_json(col("payload"), schema).as("data"))
.select("data.*")
.withWatermark("event_time", "1 minutes")

On queries with aggregation you have 3 types of outputs:

  • Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in withWatermark() as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details.

  • Update mode uses watermark to drop old aggregation state.

  • Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.

Edit later: You have to add window on your groupBy method. val aggFg = payload_df.groupBy(window($"event_time", "1 minute"), $"org", $"product") .agg(max(booked_at).as("booked_at"))