0
votes

I'm working some code to Kafka and SparkStreaming, when I put them on Yarn-Cluster, it reported NullPointerException.

But it works well on my computer (Stand-alone mode)

So what's wrong with it ?

//Here are the code

import java.util.Properties

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DealLog extends App {

  val spark=SparkSession.builder().appName(" DealLog").getOrCreate()
  val sc = spark.sparkContext
  val ssc: StreamingContext= new StreamingContext(sc, Seconds(3))

  val log = Logger.getLogger(this.getClass)
  val pro = new Properties()
  val in = Thread.currentThread().getContextClassLoader.getResourceAsStream("config.properties")
  pro.load(in)
  //  ssc.checkpoint("hdfs://192.168.0.240:8022/bigdata/checkpoint2")
  val bootstrap=pro.getProperty("kafka.brokers")
  val kafkaParams = Map[String, Object]("bootstrap.servers" -> bootstrap,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "userlabel",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )
  val topicsSet = Array(pro.getProperty("kafkaconsume.topic"))
  val ds = KafkaUtils.createDirectStream[String,String](
    ssc,
    PreferConsistent,
    Subscribe[String,String](topicsSet,kafkaParams)
  ).map(s=>{(s.value())})

  ds.foreachRDD(p=>{
    log.info("ds.foreachRdd p=="+ p)
    p.foreachPartition(per=>{
      log.info("per-------"+per)
      per.foreach(rdd=> {
        log.info("rdd---------"+ rdd)
        if(rdd.isEmpty){
          log.info("null ")
        }
        else{
          log.info("not null..")
        }
        log.info("complete")

      })
    })
  })
  ssc.start()
  ssc.awaitTermination()
}

------------------------Exception here--------------------------------

19/07/26 18:21:56 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, cdh102, executor 2): java.lang.NullPointerException at Recommend.DealLog$$anonfun$2$$anonfun$apply$1.apply(DealLog.scala:42) at Recommend.DealLog$$anonfun$2$$anonfun$apply$1.apply(DealLog.scala:41) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

19/07/26 18:21:56 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, cdh102, executor 2, partition 0, PROCESS_LOCAL,

4706 bytes) 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on cdh102, executor 2: java.lang.NullPointerException (null) [duplicate 1] 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, cdh102, executor 2, partition 0, PROCESS_LOCAL, 4706 bytes) 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on cdh102, executor 2: java.lang.NullPointerException (null) [duplicate 2] 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, cdh102, executor 2, partition 0, PROCESS_LOCAL, 4706 bytes) 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on cdh102, executor 2: java.lang.NullPointerException (null) [duplicate 3] 19/07/26 18:21:56 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 19/07/26 18:21:56 INFO cluster.YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 19/07/26 18:21:56 INFO cluster.YarnClusterScheduler: Cancelling stage 0 19/07/26 18:21:56 INFO scheduler.DAGScheduler: ResultStage 0 (foreachPartition at DealLog.scala:41) failed in 1.092 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, cdh102, executor 2): java.lang.NullPointerException at Recommend.DealLog$$anonfun$2$$anonfun$apply$1.apply(DealLog.scala:42) at Recommend.DealLog$$anonfun$2$$anonfun$apply$1.apply(DealLog.scala:41) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

1
41 p.foreachPartition(per=>{likooo
no 41 line , in Above.likooo
What's config.properties? Is this part of the jar file or on the CLASSPATH or somewhere else?Jacek Laskowski
If my answer helped could you accept?alex

1 Answers

1
votes

I think your issue is might be coming from this line

if(rdd.isEmpty)

because the way you wrote your code, that isn't actually an RDD. After you call foreachPartition you're going to be getting the iterator to that partition. THen when you call foreach on that iterator you'll be accessing the actual records on that partitions iterator. So what you're dealing with on that line is the record coming from the DStream. So potentially you might be calling .isEmpty on a null string/value which throws that exception.

You could replace .isEmpty with

if(record == null)

but you don't have to do that. You can just check if the RDD itself is empty. Can you try the below instead?

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DealLog extends App {

  val spark = SparkSession.builder().appName(" DealLog").getOrCreate()
  val sc = spark.sparkContext
  val ssc: StreamingContext = new StreamingContext(sc, Seconds(3))

  val log = Logger.getLogger(this.getClass)
  val pro = new Properties()
  val in = Thread.currentThread().getContextClassLoader.getResourceAsStream("config.properties")
  pro.load(in)
  //  ssc.checkpoint("hdfs://192.168.0.240:8022/bigdata/checkpoint2")
  val bootstrap = pro.getProperty("kafka.brokers")
  val kafkaParams = Map[String, Object]("bootstrap.servers" -> bootstrap,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "userlabel",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )
  val topicsSet = Array(pro.getProperty("kafkaconsume.topic"))
  val ds = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topicsSet, kafkaParams)
  ).map(s => {
    (s.value())
  })

  ds.foreachRDD(rdd => {
    log.info("ds.foreachRdd p==" + rdd)
    if (!rdd.isEmpty) {
      rdd.foreachPartition(partition => {
        log.info("per-------" + partition)
        partition.foreach(record => {
          log.info("record---------" + record)
        })
      })
    } else log.info("rdd was empty")

    log.info("complete")
  })
  ssc.start()
  ssc.awaitTermination()
  ssc.stop()
}