0
votes

I have five executors running for this spark job, but RDD which comes from dstream only gets distributed between 2 partitions. If I do a repartion(5) then also it remains distributed among 2 partitions, but when I create a new RDD out of it by doing a val newrdd= sparkcontext.parallelize(rdd.take(rdd.count())), then it gets distributed among the 5 partitions properly. But creating a new RDD after parallelizing an existing RDD is not a good idea, so I don't want to do this.

Am I missing something here?

Code:

val ssc = new StreamingContext(sparksession.sparkContext, Seconds(batchDuration.toLong))
    val inputDirectStream = EventHubsUtils.createDirectStreams(
      ssc,
      eventHubNamespace,
      progressDir,
      Map(eventHubName -> eventhubParameters))


inputDirectStream.foreachRDD { rdd =>
println(rdd.partitions.size)//it prints 2
rdd.repartition(5)
println(rdd.partitions.size)//it also prints 2
var newrdd = sparksession.sparkContext.parallelize(rdd.take(rdd.count().toInt))
println(newrdd.partitions.size)//it prints 5

}

I'm running my spark stream job as follows:

spark-submit --class "com.mycomp.Main" --executor-memory 1g --executor-cores 1 --num-executors 5 --conf "spark.streaming.stopGracefullyOnShutdown=true" --master yarn --jars /tmp/jobs/supporting.jar /tmp/jobs/cdc.jar false > /tmp/jobs/output 2>&1

Any suggestions on how to make the RDD get distributed between 5 partitions (depending upon number of executors and cores).

1

1 Answers

1
votes

Calling repartition returns a new RDD (with changed partitioning) which you need to use. In other words, you need to assign the return value from the repartitioning call to a new variable, otherwise you are just working with the old RDD with the old partitioning. Change to val rdd2 = rdd.repartition(5) and work with rdd2 after that.

Note: Since Scala is by design a lazy language, no actual repartitioning will take place until an action is performed on the data. For example, you could run first or count on the data to make it repartiton happen. However, checking the partitioning with rdd.partitions.size will still be correctly reflected.