1
votes

i'm here again, i trying to read data from kafka_0.9.0.0 topic using spark streaming_1.6.1 class written in scala -2.10.5. Its a simple program i built it in sbt_0.13.12. When i run the program i'm getting this exception

(run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: [B cannot be cast to java.lang.String [error] at org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [error] at org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [error] at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [error]
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) [error] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [error] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [error] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) [error] at org.apache.spark.scheduler.Task.run(Task.scala:89) [error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) [error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [error] at java.lang.Thread.run(Thread.java:745) [error] [error] Driver stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: [B cannot be cast to java.lang.String

Here is the scala program,

1 package org.kafka.receiver
      2 case class mobileData(action: String, tenantid: Int, lat: Float, lon: Float, memberid: Int, event_name: String, productUpccd: Int, device_type: String, device_os_ver: Float, item_na        me: String)
      3 import java.util.HashMap
      4 import org.apache.avro.SchemaBuilder
      5 import org.apache.spark.SparkConf
      6 import org.apache.spark.SparkContext
      7 import org.apache.spark.serializer.KryoSerializer
      8 import org.apache.spark.storage.StorageLevel
      9 import org.apache.spark.streaming.Seconds
     10 import org.apache.spark.streaming.StreamingContext
     11 import org.apache.spark.streaming.StreamingContext._
     12 import org.apache.spark.SparkContext._
     13 import org.apache.spark.streaming.dstream.ReceiverInputDStream
     14 import org.apache.spark.streaming.kafka.KafkaUtils
     15 import kafka.serializer.DefaultDecoder
     16 import org.apache.spark.sql.SQLContext
     17 import com.sun.istack.internal.logging.Logger
     18 object AvroCons {
     19   val eventSchema = SchemaBuilder.record("eventRecord").fields
     20     .name("action").`type`().stringType().noDefault()
     21     .name("tenantid").`type`().intType().noDefault()
     22     .name("lat").`type`().doubleType().noDefault()
     23     .name("lon").`type`().doubleType().noDefault()
     24     .name("memberid").`type`().intType().noDefault()
     25     .name("event_name").`type`().stringType().noDefault()
     26     .name("productUpccd").`type`().intType().noDefault()
     27     .name("device_type").`type`().stringType().noDefault()
     28     .name("device_os_ver").`type`().stringType().noDefault()
     29     .name("item_name").`type`().stringType().noDefault().endRecord
     30     def main(args: Array[String]): Unit = {
     31
     32     val sparkConf = new SparkConf().setAppName("Avro Consumer").
     33       set("spark.driver.allowMultipleContexts", "true").setMaster("local[2]")
     34     sparkConf.set("spark.cores.max", "2")
     35     sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
     36     sparkConf.set("spark.sql.tungsten.enabled", "true")
     37     sparkConf.set("spark.eventLog.enabled", "true")
     38     sparkConf.set("spark.app.id", "KafkaConsumer")
     39     sparkConf.set("spark.io.compression.codec", "snappy")
     40     sparkConf.set("spark.rdd.compress", "true")
     41     sparkConf.set("spark.streaming.backpressure.enabled", "true")
     42     sparkConf.set("spark.sql.avro.compression.codec", "snappy")
     43     sparkConf.set("spark.sql.avro.mergeSchema", "true")
     44     sparkConf.set("spark.sql.avro.binaryAsString", "true")
     45       val sc = new SparkContext(sparkConf)
     46     sc.hadoopConfiguration.set("avro.enable.summary-metadata", "false")
     47     val ssc = new StreamingContext(sc, Seconds(2))
     48     val kafkaConf = Map[String, String]("metadata.broker.list" -> "############:9092",
     49         "zookeeper.connect" -> "#############",
     50         "group.id" -> "KafkaConsumer",
     51         "zookeeper.connection.timeout.ms" -> "1000000")
     52       val topicMaps = Map("fishbowl" -> 1)
     53       val messages  = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

     54   messages.print()    
     55   val lines = messages.map(x=>x._2);  lines.foreachRDD((rdd,time)=>{
     56 val count = rdd.count()
     57 if(count>0)
     58 rdd.foreach(record=>{println(record)})})
     59
     60   ssc.start()
     61     ssc.awaitTermination()
     62     }
     63
     64 }

And here is my build.sbt

name := "AvroConsumer" 
version := "1.0" 
scalaVersion := "2.10.6"
jarName in assembly := "AvroConsumer.jar" 

libraryDependencies  += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided"    

libraryDependencies  += "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided" 

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1"

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-assembly_2.10" % "1.6.1"

libraryDependencies  += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1" 

libraryDependencies  += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13"

libraryDependencies += "org.openrdf.sesame" % "sesame-rio-api" % "2.7.2" 

libraryDependencies += "com.databricks" % "spark-csv_2.10" %  "0.1"

libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"

libraryDependencies += "log4j" % "log4j" % "1.2.17"

libraryDependencies += "org.apache.avro" % "avro-tools" % "1.7.4"

assemblyMergeStrategy in assembly := {  case PathList("META-INF", xs @
_*) => MergeStrategy.discard  case x => MergeStrategy.first }   

I'm preparing this code to create a DF from the kafka topic, so i had to set all those properties in sparkConf(). Here is the schema of my incoming data,

{
  "action": "AppEvent",
  "tenantid": 299,
  "lat": 0.0,
  "lon": 0.0,
  "memberid": 16445,
  "event_name": "CATEGORY_CLICK",
  "productUpccd": 0,
  "device_type": "iPhone",
  "device_os_ver": "10.1",
  "item_name": "CHICKEN"
}

And here is my kafka producer class.

public class KafkaAvroProducer {

    /* case class
     TopicData("action":"AppEvent","tenantid":1173,"lat":0.0,"lon":0.0,"memberid":55,
     "event_name":"CATEGORY_CLICK",
     "productUpccd":0,"device_type":"iPhone","device_os_ver":"10.1","item_name":"CHICKEN",*/

    public static final String EVENT_SCHEMA = "{" + "\"type\":\"record\","
            + "\"name\":\"eventrecord\"," + "\"fields\":["
            + "  { \"name\":\"action\", \"type\":\"string\" },"
            + "  { \"name\":\"tenantid\", \"type\":\"int\" },"
            + "  { \"name\":\"lat\", \"type\":\"double\" },"
            + "  { \"name\":\"lon\", \"type\":\"double\" },"
            + "  { \"name\":\"memberid\", \"type\":\"int\" },"
            + "  { \"name\":\"event_name\", \"type\":\"string\" },"
            + "  { \"name\":\"productUpccd\", \"type\":\"int\" },"
            + "  { \"name\":\"device_type\", \"type\":\"string\" },"
            + "  { \"name\":\"device_os_ver\", \"type\":\"string\" },"
            + "{ \"name\":\"item_name\", \"type\":\"string\" }" + "]}";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "##########:9092");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("producer.type", "async");
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(EVENT_SCHEMA);
        Injection<GenericRecord, String> avroRecords = GenericAvroCodecs.toJson(schema);
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i<300;i++){
            GenericData.Record avroRecord = new GenericData.Record(schema);
            setEventValues(i, avroRecord);
            String messages = avroRecords.apply(avroRecord);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("fishbowl",String.valueOf(i),messages);
            System.out.println(producerRecord);
            producer.send(producerRecord);

        }
        producer.close();
    }

    private static void setEventValues(int i, Record avroRecord) {

        avroRecord.put("action", "AppEvent");
        avroRecord.put("tenantid", i);
        avroRecord.put("lat", i*0.0);
        avroRecord.put("lon", 0.0);
        avroRecord.put("memberid", i*55);
        avroRecord.put("event_name", "CATEGORY_CLICK");
        avroRecord.put("productUpccd", 0);
        avroRecord.put("device_type", "iPhone");
        avroRecord.put("device_os_ver", "10.1");
        avroRecord.put("item_name", "CHICKEN");
    }

}
1
What's the type of the data inserted in Kafka?maasg
I will provide you the producer class also, see for the changes.jack AKA karthik
Are you sure that Kafka is empty of other messages when this program starts? The exception says that we're trying to cast a Byte[] as a String but the Kafka deserializers look well configured as well as the producer. Try pruning kafka of all messages before testing.maasg
Maybe you need to check consistency in the system. From your code, I read: Producer = KafkaProducer<String, String> producer and consumer = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder] which are matching correctly with each other.maasg
([B@2c15dae5,[B@4ed606ff) ([B@456577c8,[B@89b0f10) ([B@7bf9e7d9,[B@53e49725) ([B@7867a787,[B@5d27cdd2) ([B@6039c5c5,[B@70e739dd) ([B@5c881613,[B@6a6168da) ([B@baf522d,[B@233c0e59) ([B@3bd20721,[B@59d22635) ([B@73ca8fa4,[B@779c935f) ([B@488f7f52,[B@44f005b5)jack AKA karthik

1 Answers

4
votes

The Kafka consumer should be configured to use the right decoder:

Instead of:

KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder]

For String it should be:

KafkaUtils.createStream[String, String,StringDecoder, StringDecoder]