1
votes

Basically i am consuming data from multiple kafka topics using single Spark Streaming consumer[Direct Approach].

val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)

Batch interval is 30 Seconds.

I got couple of questions here.

  1. Will the DStream contains multiple RDD's instead of Single RDD when i call foreachRDD on DStream? will each topic create separate RDD??
  2. If yes,i want to union all the RDDs to single RDD , then process the data. How do i do that?
  3. If my processing time is more than batch interval, will the DStream contain more than one RDDs?

I tried to union DStream RDDs to single RDD using the below way. First of all is my understanding correct? If the DStream always returns single RDD, then the below code is not necessary.

Sample Code:

var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
        {
            dStreamRDDList += rdd
        })
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache()

//THEN PROCESS USING joinedRDD
//Convert joinedRDD to DF, then apply aggregate operations using DF API.
1

1 Answers

2
votes

Will the DStream contains multiple RDD's instead of Single RDD when i call foreachRDD on DStream? will each topic create separate RDD?

No. Even though you have multiple topics, you'll have a single RDD at any given batch interval.

If my processing time is more than batch interval, will the DStream contain more than one RDDs?

No, if your processing time is longer than batch interval, all that will be done is reading off the topic offsets. Processing of the next batch will only begin once the previous job has finished.

As a side note, make sure you actually need to use foreachRDD, or if perhaps you're misusing the DStream API (disclaimer: I am the author of that post)