I am encountering something that on the first glance of Spark Streaming novice seems counter-intuitive:
when Spark Structured Streaming starts processing more data, it's batch duration decreases
This is probably not the most accurate picture, but I saw much clearer pattern..
I probably need an explanation of what exactly is the batch duration - my understanding is that it represents the the number of seconds it takes Spark to process the streaming's mini-batch.
Next, I need clarification on how Spark triggers the processing of the mini-batch - whether it's based on the amount of data in the batch or time intervals...
EDIT
The code is following. There is quite a lot of "heavy" operations (joins, dropDuplicates, filtering with HOF, udfs, ...). Sink and Source are both Azure Eventhubs
# [CONFIGS]
ehConfig = {
'eventhubs.startingPosition': '{"offset": "@latest", "enqueuedTime": null, 'isInclusive': true,'seqNo': -1}',
'eventhubs.maxEventsPerTrigger': 300,
'eventhubs.connectionString'='XXX'}
ehOutputConfig = {
'eventhubs.connectionString'='YYY' ,
"checkpointLocation": "azure_blob_storage/ABCABC"
}
spark.conf.set("spark.sql.shuffle.partitions", 3)
# [FUNCS]
@udf(TimestampType())
def udf_current_timestamp():
return datetime.now()
#-----------#
# STREAMING #
#-----------#
# [STREAM INPUT]
df_stream_input = spark.readStream.format("eventhubs").options(**_ehConfig).load()
# [ASSEMBLY THE DATAFRAME]
df_joined = (df_stream_input
.withColumn("InputProcessingStarted", current_timestamp().cast("long"))
# Decode body
.withColumn("body_decoded", from_json(col("body").cast("string"), schema=_config))
# Join customer
.join(df_batch, ['CUSTOMER_ID'], 'inner')
# Filtering
.filter(expr('body_decoded.status NOT IN (0, 4, 32)'))
.filter(expr('EXISTS(body_decoded.items, item -> item.ID IN (1, 2, 7))'))
# Deduplication
.withWatermark('enqueuedTime', '1 day')
.dropDuplicates(['CUSTOMER_ID', 'ItemID'])
# Join with lookup table
.join(broadcast(df_lookup), ['OrderType'], 'left')
# UDF
.withColumn('AssembleTimestamp', udf_current_timestamp())
# Assemble struct
.withColumn('body_struct', struct('OrderType', 'OrderID', 'Price', 'StockPile'))
# [STREAM OUTPUT]
(df_joined
.select(to_json('body_struct').alias('body'))
.writeStream
.format("eventhubs")
.options(**_ehOutputConfig)
.trigger(processingTime='2 seconds')
.start())
streamingQuery.explain
in your question? – Jacek Laskowski