2
votes

I am running Flink on only one node with Parallelism = 1 in order to compare its performance with a single-threaded application. I'm wondering if Flink is still using a Shuffle although it does not run in parallel. So if e.g. the following command is executed:

var counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .groupBy(0)
  .sum(1)

Will a Shuffle be used before the groupBy? And is there a way to check this?

(In the output of the Interactive Scala Shell there is a FlatMap, Map, Combine and finally a Reduce to observe. The same applies when running with Parallelism > 1.)

1

1 Answers

2
votes

Flink generates for the operation ds.groupBy(0).sum(1) the job graph ... -> Combiner -> Reducer independent of the actual parallelism. Between the Combiner and the Reducer there is a hash partitioner (shuffle step) introduced. This makes sense for all parallelism > 1.

For parallelism = 1, the optimizer could theoretically remove the shuffle step, because it is not necessary. However, it should actually not affect the performance of the program.

The reason is that for parallelism = 1 all work will be done in the local combiner. This means that the combiner calculates the resulting sum and then only sends a single element to the reducer. Furthermore, since the combiner and the reducer run on the same machine, there is no network communication involved. The data is simply transferred by handing over a memory segment. Since Flink also supports streamed shuffling, the combiner does not even have to finish before the first results can be sent to the reducer. Both, the combiner and the reducer, can run at the same time, thus avoiding the materialization of intermediate results.