0
votes

I am encountering something that on the first glance of Spark Streaming novice seems counter-intuitive:

when Spark Structured Streaming starts processing more data, it's batch duration decreases

This is probably not the most accurate picture, but I saw much clearer pattern.. enter image description here

I probably need an explanation of what exactly is the batch duration - my understanding is that it represents the the number of seconds it takes Spark to process the streaming's mini-batch.

Next, I need clarification on how Spark triggers the processing of the mini-batch - whether it's based on the amount of data in the batch or time intervals...

EDIT
The code is following. There is quite a lot of "heavy" operations (joins, dropDuplicates, filtering with HOF, udfs, ...). Sink and Source are both Azure Eventhubs

# [CONFIGS]
ehConfig = {
'eventhubs.startingPosition': '{"offset": "@latest", "enqueuedTime": null, 'isInclusive': true,'seqNo': -1}',
'eventhubs.maxEventsPerTrigger': 300,
'eventhubs.connectionString'='XXX'}

ehOutputConfig = {
'eventhubs.connectionString'='YYY' ,   
"checkpointLocation": "azure_blob_storage/ABCABC"
}

spark.conf.set("spark.sql.shuffle.partitions", 3)

# [FUNCS]
@udf(TimestampType())
def udf_current_timestamp():
  return datetime.now()

#-----------#
# STREAMING # 
#-----------#

# [STREAM INPUT]
df_stream_input = spark.readStream.format("eventhubs").options(**_ehConfig).load()

# [ASSEMBLY THE DATAFRAME]
df_joined = (df_stream_input
             .withColumn("InputProcessingStarted", current_timestamp().cast("long"))

             # Decode body
             .withColumn("body_decoded", from_json(col("body").cast("string"), schema=_config))

             # Join customer 
             .join(df_batch, ['CUSTOMER_ID'], 'inner')

             # Filtering
             .filter(expr('body_decoded.status NOT IN (0, 4, 32)'))
             .filter(expr('EXISTS(body_decoded.items, item -> item.ID IN (1, 2, 7))'))

             # Deduplication
             .withWatermark('enqueuedTime', '1 day') 
             .dropDuplicates(['CUSTOMER_ID', 'ItemID']) 

             # Join with lookup table
             .join(broadcast(df_lookup), ['OrderType'], 'left') 

             # UDF
             .withColumn('AssembleTimestamp', udf_current_timestamp())

             # Assemble struct 
             .withColumn('body_struct', struct('OrderType', 'OrderID', 'Price', 'StockPile'))

 # [STREAM OUTPUT]
(df_joined
 .select(to_json('body_struct').alias('body'))
 .writeStream
 .format("eventhubs")
 .options(**_ehOutputConfig)
 .trigger(processingTime='2 seconds')
 .start())
1
Can you describe the query's write-path? The trigger, the sink, and the output mode. What is the streaming query doing? Is it stateless or stateful? Any aggregations or joins?Jacek Laskowski
Hi @JacekLaskowski, I added the query code. There is quite a lot of going on - the query contains 2 joins, dropDuplicates, parsing and assembling structs, filtering, udf etc.. But in the end, the query is just row-by-row processing => output mode is simple append.mLC
Addition: we run it on two 8GB, 2 CPU nodes (one is driver, one is worker)mLC
Can you include streamingQuery.explain in your question?Jacek Laskowski
BTW How did you get the metrics and the charts?Jacek Laskowski

1 Answers

1
votes

In Spark structured streaming, it triggers new batch as soon as previous batch has finished processing unless you specify trigger option.

In earlier version of Spark with Spark Streaming, we could specify the batch duration of let's say 5 seconds. In that case, it will trigger micro-batch every 5 seconds and process the data that has arrived in last 5 seconds. In case of kafka, it will get data that hasn't been committed.