Feeling slightly lost at this point.
I have a streaming application based on Spark 2.4.2 & Kafka that writes an aggregated (time window'ed) stream back to Kafka:
- [
DF1] a streaming pre-partitioned Dataframe (on KeytheKey), i.e. A stream that guarantees that K arrives in the same partition everytime. - [
DF2] Lookup table (~1000 rows) that I join with the DF1. - GroupBy based on Key and continuously moving Window of 1 Day.
DF1.join(DF2, "df1.a" === "df2.b", "left")
.withWatermark("timestamp", "24 hours")
.groupBy(window('timestamp, "24 hours"), 'theKey)
.agg(collect_list('payload) as "payload")
Problem: The Shuffle. With pre-partitioning the dataset before hand (in Kafka), I was hoping to achieve partition-local groupBy's. Unfortunately that didn't work.
Question is, what is the right way of achieving this, without shuffles? Is there any?
Solutions I've explored so far:
- "agg over window...": Not supported in streaming (Spark throws:
Non-time-based windows are not supported on streaming DataFrames/Datasets) - mapPartitions: Not sure how to factor in State (mapWithState).
mapGroupsWithStaterequires aKeyValueGroupedDataset[K, V]that's only given by GroupByKey.
Solutions I'm considering (reluctantly):
- mapPartitions on dataframe, with custom state management. However, that renders Spark's stateful streaming, useless.
- Somehow plug the original hash-partitioning (from Kafka dataframe) into Spark, so that it takes care of shuffles for good (and not use the default
200), but I haven't found a definitive source yet.
Any help is greatly appreciated!
DF1.join(broadcast(DF2), ...). This should prevent shuffling and keep your keys in their original partitions. - Hristo Iliev