I'm working some code to Kafka and SparkStreaming, when I put them on Yarn-Cluster, it reported NullPointerException
.
But it works well on my computer (Stand-alone mode)
So what's wrong with it ?
//Here are the code
import java.util.Properties
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DealLog extends App {
val spark=SparkSession.builder().appName(" DealLog").getOrCreate()
val sc = spark.sparkContext
val ssc: StreamingContext= new StreamingContext(sc, Seconds(3))
val log = Logger.getLogger(this.getClass)
val pro = new Properties()
val in = Thread.currentThread().getContextClassLoader.getResourceAsStream("config.properties")
pro.load(in)
// ssc.checkpoint("hdfs://192.168.0.240:8022/bigdata/checkpoint2")
val bootstrap=pro.getProperty("kafka.brokers")
val kafkaParams = Map[String, Object]("bootstrap.servers" -> bootstrap,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "userlabel",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val topicsSet = Array(pro.getProperty("kafkaconsume.topic"))
val ds = KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent,
Subscribe[String,String](topicsSet,kafkaParams)
).map(s=>{(s.value())})
ds.foreachRDD(p=>{
log.info("ds.foreachRdd p=="+ p)
p.foreachPartition(per=>{
log.info("per-------"+per)
per.foreach(rdd=> {
log.info("rdd---------"+ rdd)
if(rdd.isEmpty){
log.info("null ")
}
else{
log.info("not null..")
}
log.info("complete")
})
})
})
ssc.start()
ssc.awaitTermination()
}
------------------------Exception here--------------------------------
19/07/26 18:21:56 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, cdh102, executor 2): java.lang.NullPointerException at Recommend.DealLog$$anonfun$2$$anonfun$apply$1.apply(DealLog.scala:42) at Recommend.DealLog$$anonfun$2$$anonfun$apply$1.apply(DealLog.scala:41) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
19/07/26 18:21:56 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, cdh102, executor 2, partition 0, PROCESS_LOCAL,
4706 bytes) 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on cdh102, executor 2: java.lang.NullPointerException (null) [duplicate 1] 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, cdh102, executor 2, partition 0, PROCESS_LOCAL, 4706 bytes) 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on cdh102, executor 2: java.lang.NullPointerException (null) [duplicate 2] 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, cdh102, executor 2, partition 0, PROCESS_LOCAL, 4706 bytes) 19/07/26 18:21:56 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on cdh102, executor 2: java.lang.NullPointerException (null) [duplicate 3] 19/07/26 18:21:56 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 19/07/26 18:21:56 INFO cluster.YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 19/07/26 18:21:56 INFO cluster.YarnClusterScheduler: Cancelling stage 0 19/07/26 18:21:56 INFO scheduler.DAGScheduler: ResultStage 0 (foreachPartition at DealLog.scala:41) failed in 1.092 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, cdh102, executor 2): java.lang.NullPointerException at Recommend.DealLog$$anonfun$2$$anonfun$apply$1.apply(DealLog.scala:42) at Recommend.DealLog$$anonfun$2$$anonfun$apply$1.apply(DealLog.scala:41) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
config.properties
? Is this part of the jar file or on the CLASSPATH or somewhere else? – Jacek Laskowski