3
votes

We have a job that aggregates data over time windows. We're new to spark, and we observe significantly different performance characteristics for running the logically same query as a streaming vs a batch job. We want to understand what's going on and find possible ways to improve the speed of the structured streaming based approach.

For the sake of this post, suppose the schema is

root
 |-- objectId: long (nullable = true)
 |-- eventTime: long (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)

where

  • date and hour are (derived) partition keys, i.e. parquet files are stored in folders like date=2020-07-26/hour=4.
  • the underlying format type is a delta lake.
  • an hour of data has about 200 million events
  • objectId is widely spread (10 million distinct values observed in an hour, very uneven distribution)
  • we're trying to count the number of events per objectId, in 5 minute buckets
  • the underlying source is streamed to from a kafka queue (and runs every minute)
    • two new files appear on the ADL2 every minute, size is 25MB each (actual file contains some 10 additional columns that are not shown above)

We're running a structured streaming job basically doing:

df.read.format("delta")
  .withWatermark("7 minutes") // watermark only applied to streaming query
  .groupBy($"date", $"hour", $"objectId", window($"eventTime", "5 minutes"))
  .coalesce(1) // debatable; we like limited number of files
  .partitionBy("date", "hour")
  .writeStream
  .format("delta")
  .option("checkpointLocation", <...>)
  .partitionBy("date", "hour")
  .start(<destination url>)
  .awaitTermination

The associated batch job basically does the same thing with the exception of withWatermark and comparable replacements for writeStream etc. It reads from exactly the same source, so it will read exactly the same files, with the same size etc.

We are running these on:

  • azure databricks
  • azure data lake gen 2

Observations:

  • the batch job is able to aggregate one hour in about one minute, running on the smallest possible cluster (3x F4s)
  • the structured streaming job OOMs, even with (3x DS3_v2), so we had to configure larger instances (3x L4s, 32GB per node)
    • CPUs are practically idle (97.4% idle)
    • each micro batch takes 30-60s (almost exclusively spent in addBatch)
    • low network activity (maybe 2MB / s)
  • generally, I have the feeling that the streaming job wouldn't be able to hold up when data intake increases (we're planning for 10x as much traffic)

My understanding is that the streaming query, given the watermark (7 minutes) and the window size (5 minutes) only has to look back for less than 15 minutes, until it can write out a 5 minute window and discard all associated state.

Questions:

  • why does the structured streaming based solution needs so much more memory?
    • assuming we have to maintain state for some 10 million entries, I don't see how we could need that much
  • what could cause the high processing time for the streaming job, given that it sits idle?
  • what kind of metrics should I look at (spark newbie here)?
1

1 Answers

1
votes

df.read.format("delta")

It looks like you are creating a static dataframe and then converting this static dataframe into the streaming one. Aggregations are applied to the static dataframe and windowing might not work for this reason. Try creating a streaming dataframe:

  val DF = spark
  .readStream
  .format("delta")...

Some examples can be found here https://docs.databricks.com/delta/delta-streaming.html#delta-table-as-a-stream-source