1
votes

My Question is regarding the StatefulNetworkWordCount example :

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

Q1) The stateDstream RDD is maintained by the driver or the worker node or does each worker node has its own local copy of the complete state rdd?

Q2) Why do we need a HashPartitioner in the following line :

val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)

What is happening behind the scenes here ?

1

1 Answers

1
votes

To answer both of your questions:

1) The RDD's produced by DStream are distributed across the workers. Similar to non-streaming, this means that records from each RDD produced by the DStream are spread out across the cluster (which is why partitioning matters here).

2) Partitioning is important in this case because it settles how records from every RDD iteration are split up. Especially with a transformation like updateStateByKey(), you tend to see keys of RDD's across various batch intervals stay the same. So it goes without saying here that if our keys from each interval RDD arrayed across the same partitions, this function can work more efficiently and can update state for a key within a partition.

As an example, let us look at the word count program you linked. Let us consider RDD's at two one second intervals (rdd1 at t=1 and rdd2 at t=2). Say rdd1 generated is for the text "hello world" and rdd2 generated also sees the text "hello I'm world". Without partitioning, the records for each RDD can be sent to various partitions on various workers (the "hello" at t=1 and "hello" at t=2 could be sent to separate locations). This implies that an update to the count state would need to reshuffle records on each iteration to obtain the updated count. With a partitioner defined (and remembered as indicated by one of the parameters!), we will see keys "hello" and "world" at the same partition, thereby avoiding a shuffle, and creating a more efficient update.

It is important to also note here that because keys can change, there is a parameter to toggle whether or not to remember the partitioner.