I am try to implement Apache kafka and spark streaming Integration here is my python code:
from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
#conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
conf = SparkConf().setAppName("Kafka-Spark")
#sc = SparkContext(appName="KafkaSpark")
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,1)
map1={'demo':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "test-consumer-group", map1)
# kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1) #tried with localhost:2181 too
lines = kafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
stream.start()
stream.awaitTermination()
when I run above program then it display output on terminal :
16/10/24 15:27:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at kafka.utils.Pool.(Pool.scala:28) at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:91) at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:143) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:94) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more 16/10/24 15:27:20 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main] java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at kafka.utils.Pool.(Pool.scala:28) at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:91) at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:143) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:94) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more 16/10/24 15:27:20 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 16/10/24 15:27:20 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at kafka.utils.Pool.(Pool.scala:28) at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:91) at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:143) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:94) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
16/10/24 15:27:20 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times;