0
votes

In Spark I create a stream from Kafka with a batch time of 5 seconds. Many messages can come in during that time and I want to process each of them individually, but it seems that with my current logic only the first message of each batch is being processed.

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topics)

val messages = stream.map((x$2) => x$2._2)

messages.foreachRDD { rdd =>
    if(!rdd.isEmpty) {
        val message = rdd.map(parse)
        println(message.collect())
    }
}

The parse function simply extracts relevant fields from the Json message into a tuple.

I can drill down into the partitions and process each message individually that way:

messages.foreachRDD { rdd =>
    if(!rdd.isEmpty) {
        rdd.foreachPartition { partition =>
            partition.foreach{msg =>
                val message = parse(msg)
                println(message)
            }
        }
    }
}

But I'm certain there is a way to stay at the RDD level. What am I doing wrong in the first example?

I'm using spark 2.0.0, scala 2.11.8 and spark streaming kafka 0.8.

1

1 Answers

1
votes

Here is the sample Streaming app which converts each message for the batch in to upper case inside for each loop and prints them. Try this sample app and then recheck your application. Hope this helps.

object SparkKafkaStreaming {

def main(args: Array[String]) {

//Broker and topic
val brokers = "localhost:9092"
val topic = "myTopic"

//Create context with 5 second batch interval
val sparkConf = new SparkConf().setAppName("SparkKafkaStreaming").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))

//Create direct kafka stream with brokers and topics
val topicsSet = Set[String](topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val msgStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

//Message
val msg = msgStream.map(_._2)    
msg.print()

//For each
msg.foreachRDD { rdd =>
  if (!rdd.isEmpty) {
     println("-----Convert Message to UpperCase-----")
    //convert messages to upper case
    rdd.map { x => x.toUpperCase() }.collect().foreach(println)
  } else {
    println("No Message Received")
  }
}

//Start the computation
ssc.start()
ssc.awaitTermination()
  }
}