1
votes

I am trying to read a json string from kafka using Spark stream library. The code is able to connect to kafka broker but fails while decoding the message. The code is inspired from

https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/KafkaStreamingJson.scala

val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
         StringDecoder](ssc, kParams, kTopic).map(_._2)
  println("Starting to read from kafka topic:" + topicStr)
kStream.foreachRDD { rdd =>

   if (rdd.toLocalIterator.nonEmpty) {

          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            sqlContext.read.json(rdd).registerTempTable("mytable")
            if (firstTime) {
                sqlContext.sql("SELECT * FROM mytable").printSchema()
            }
            val df = sqlContext.sql(selectStr)
            df.collect.foreach(println)
            df.rdd.saveAsTextFile(fileName)
            mergeFiles(fileName, firstTime)
            firstTime = false
           println(rdd.name)
        }

java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:222) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

1
How did you run the job? Seems like kafka isn't available at runtimeYuval Itzchakov
Kafka is available and it makes the connection. I tested this negatively by changing to random kafka broker. The exception is coming from line if (rdd.toLocalIterator.nonEmpty) {Vaibhav Khanduja

1 Answers

0
votes

The problem was with the version of Kafka jars used, using 0.9.0.0 fixed the issues. The class, kafka.message.MessageAndMetadata was introduced in 0.8.2.0.