I have five executors running for this spark job, but RDD which comes from dstream only gets distributed between 2 partitions. If I do a repartion(5)
then also it remains distributed among 2 partitions, but when I create a new RDD out of it by doing a val newrdd= sparkcontext.parallelize(rdd.take(rdd.count()))
, then it gets distributed among the 5 partitions properly. But creating a new RDD after parallelizing an existing RDD is not a good idea, so I don't want to do this.
Am I missing something here?
Code:
val ssc = new StreamingContext(sparksession.sparkContext, Seconds(batchDuration.toLong))
val inputDirectStream = EventHubsUtils.createDirectStreams(
ssc,
eventHubNamespace,
progressDir,
Map(eventHubName -> eventhubParameters))
inputDirectStream.foreachRDD { rdd =>
println(rdd.partitions.size)//it prints 2
rdd.repartition(5)
println(rdd.partitions.size)//it also prints 2
var newrdd = sparksession.sparkContext.parallelize(rdd.take(rdd.count().toInt))
println(newrdd.partitions.size)//it prints 5
}
I'm running my spark stream job as follows:
spark-submit --class "com.mycomp.Main" --executor-memory 1g --executor-cores 1 --num-executors 5 --conf "spark.streaming.stopGracefullyOnShutdown=true" --master yarn --jars /tmp/jobs/supporting.jar /tmp/jobs/cdc.jar false > /tmp/jobs/output 2>&1
Any suggestions on how to make the RDD get distributed between 5 partitions (depending upon number of executors and cores).