0
votes

I tried using spark streaming to process kafka messages,followed this wiki https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html and my code is below:

SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("spark://sl:7077");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "10.0.1.5:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Collections.singletonList("test");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
    stream.print();

after submit, it returns :

17/04/05 22:43:10 INFO SparkContext: Starting job: print at JavaDirectKafkaWordCount.java:47
17/04/05 22:43:10 INFO DAGScheduler: Got job 0 (print at JavaDirectKafkaWordCount.java:47) with 1 output partitions
17/04/05 22:43:10 INFO DAGScheduler: Final stage: ResultStage 0 (print at JavaDirectKafkaWordCount.java:47)
17/04/05 22:43:10 INFO DAGScheduler: Parents of final stage: List()
17/04/05 22:43:10 INFO DAGScheduler: Missing parents: List()
17/04/05 22:43:10 INFO DAGScheduler: Submitting ResultStage 0 (KafkaRDD[0]     at createDirectStream at JavaDirectKafkaWordCount.java:44), which has no missing parents
17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0 stored as values in     memory (estimated size 2.3 KB, free 366.3 MB)
17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1529.0 B, free 366.3 MB)
17/04/05 22:43:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.245.226.155:15258 (size: 1529.0 B, free: 366.3 MB)
17/04/05 22:43:10 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/04/05 22:43:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (KafkaRDD[0] at createDirectStream at     JavaDirectKafkaWordCount.java:44)
17/04/05 22:43:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.245.226.155:53448) with ID 0
17/04/05 22:43:10 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.245.226.155, executor 0, partition 0, PROCESS_LOCAL, 7295 bytes)
17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 10.245.226.155:14669 with 366.3 MB RAM, BlockManagerId(0, 10.245.226.155, 14669, None)
17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.245.226.155:53447) with ID 1
17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 10.245.226.155:33754 with 366.3 MB RAM, BlockManagerId(1, 10.245.226.155, 33754, None)
17/04/05 22:43:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.245.226.155, executor 0): java.lang.NullPointerException
at org.apache.spark.util.Utils$.decodeFileNameInURI(Utils.scala:409)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:434)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:508)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:500)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:500)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:257)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

can someone help on it? thanks very much.

1

1 Answers

0
votes

Can you provide the parameters passed to spark-submit?

You might have passed a jar-file name instead of an absolute path to the jar file. The class org.apache.spark.executor.Executor tries to load "Added Jars" and "Added Files" in updateDependencies method, but the URI path is not as presumed by spark.