I set up a small test on my laptop that does the following:
I created a Kafka topic with a few 1000 messages where each message contains a few rows with each row having about 100 columns.
Create 300 quite complex Spark columns in a List[Column]. No aggregations.
On setting up the stream from Kafka I set .option("maxOffsetsPerTrigger", 1) so only one message is processed in each mini-batch.
I then apply the columns to the mini-batches consisting of just one message.
val testStream = myStream
.select(manyTestCols :_*)
.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
Spark takes about 10 seconds to process each message.
I then change maxOffsetsPerTrigger to .option("maxOffsetsPerTrigger", 1000) so 1000 messages are processed in each mini-batch.
Spark takes about 11 seconds to process all 1000 messages in each mini-batch.
So, it seems that Spark does some kind of "setup work" and then processes each mini-batch quite fast once it gets going.
Is this "setup work" going through the query planning through to the physical plan, for each mini-batch?
If so, does it make sense for Spark to do this each mini-batch?
Or is something else entirely going on? Am looking at Spark source code, but would appreciate feedback from someone that has gone through this exercise already.
Tx for any insights.