0
votes

Feeling slightly lost at this point.

I have a streaming application based on Spark 2.4.2 & Kafka that writes an aggregated (time window'ed) stream back to Kafka:

  • [DF1] a streaming pre-partitioned Dataframe (on Key theKey), i.e. A stream that guarantees that K arrives in the same partition everytime.
  • [DF2] Lookup table (~1000 rows) that I join with the DF1.
  • GroupBy based on Key and continuously moving Window of 1 Day.
    DF1.join(DF2, "df1.a" === "df2.b", "left")
       .withWatermark("timestamp", "24 hours")
       .groupBy(window('timestamp, "24 hours"), 'theKey)
       .agg(collect_list('payload) as "payload")

Problem: The Shuffle. With pre-partitioning the dataset before hand (in Kafka), I was hoping to achieve partition-local groupBy's. Unfortunately that didn't work.

Question is, what is the right way of achieving this, without shuffles? Is there any?

Solutions I've explored so far:

  1. "agg over window...": Not supported in streaming (Spark throws: Non-time-based windows are not supported on streaming DataFrames/Datasets)
  2. mapPartitions: Not sure how to factor in State (mapWithState). mapGroupsWithState requires a KeyValueGroupedDataset[K, V] that's only given by GroupByKey.

Solutions I'm considering (reluctantly):

  1. mapPartitions on dataframe, with custom state management. However, that renders Spark's stateful streaming, useless.
  2. Somehow plug the original hash-partitioning (from Kafka dataframe) into Spark, so that it takes care of shuffles for good (and not use the default 200), but I haven't found a definitive source yet.

Any help is greatly appreciated!

1
Try forcing broadcast joins with DF1.join(broadcast(DF2), ...). This should prevent shuffling and keep your keys in their original partitions. - Hristo Iliev
@HristoIliev I did just that. It came down a bit, but still a sizeable shuffle. - TroubleShooter

1 Answers

0
votes

Actually the lookup tables were causing all the shuffle. I was hoping that Spark would prefer partitioning of the larger dataset over the smaller lookup tables, but that wasn't the case. It took the streaming dataset, disregarded the partitioning and shuffled them whereever the lookup table partitions were.

As soon as I repartitioned the lookup tables as per Streaming dataframe, Spark was happy. Though, its counter-intuitive that Spark doesn't prioritize the partitioning of the bigger dataset more than the smaller ones.