I tried loading data from Kafka which is successful but I'm unable to convert to spark RDD ,
val kafkaParams = Map("metadata.broker.list" -> "IP:6667,IP:6667")
val offsetRanges = Array(
OffsetRange("first_topic", 0,1,1000)
)
val ssc = new StreamingContext(new SparkConf, Seconds(60))
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
Now How can I read this stream object ??? I mean convert this in to Spark Dataframe and perform some computations
I tried converting to dataframe
stream.foreachRDD { rdd =>
println("Hello")
import sqlContext.implicits._
val dataFrame = rdd.map {case (key, value) => Row(key, value)}.toDf()
}
but toDf is not working error: value toDf is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]