2
votes

I tried loading data from Kafka which is successful but I'm unable to convert to spark RDD ,

val kafkaParams = Map("metadata.broker.list" -> "IP:6667,IP:6667")
val offsetRanges = Array(
    OffsetRange("first_topic", 0,1,1000)
  )
val ssc = new StreamingContext(new SparkConf, Seconds(60))
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

Now How can I read this stream object ??? I mean convert this in to Spark Dataframe and perform some computations

I tried converting to dataframe

    stream.foreachRDD { rdd =>
     println("Hello")
      import sqlContext.implicits._
      val dataFrame = rdd.map {case (key, value) => Row(key, value)}.toDf()
    }

but toDf is not working error: value toDf is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

2

2 Answers

2
votes

It is old but I think you forget to add the schema when creating a df from a row:

val df =  sc.parallelize(List(1,2,3)).toDF("a")
val someRDD = df.rdd
val newDF = spark.createDataFrame(someRDD, df.schema)

(tested in spark-shell 2.2.0)

1
votes
val kafkaParams = Map("metadata.broker.list" -> "IP:6667,IP:6667")
val offsetRanges = Array(
    OffsetRange("first_topic", 0,1,1000)
  )
val ssc = new StreamingContext(new SparkConf, Seconds(60))
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 

val lines = stream.map(_.value)
val words = lines.flatMap(_.split(" ")).print()   //def createDataFrame(words: RDD[Row], Schema: StructType)

// Start your computation then
ssc.start()
ssc.awaitTermination()