Basically i am consuming data from multiple kafka topics using single Spark Streaming consumer[Direct Approach].
val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
Batch interval is 30 Seconds
.
I got couple of questions here.
- Will the DStream contains multiple RDD's instead of Single RDD when i call foreachRDD on DStream? will each topic create separate RDD??
- If yes,i want to union all the RDDs to single RDD , then process the data. How do i do that?
- If my processing time is more than batch interval, will the DStream contain more than one RDDs?
I tried to union DStream RDDs to single RDD using the below way. First of all is my understanding correct? If the DStream always returns single RDD, then the below code is not necessary.
Sample Code:
var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
{
dStreamRDDList += rdd
})
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache()
//THEN PROCESS USING joinedRDD
//Convert joinedRDD to DF, then apply aggregate operations using DF API.