I am trying to integrate Apache Kafka with Apache spark streaming using Python (I am new to all these).
For this I have done the following steps
- Started Zookeeper
- Started Apache Kafka
- Added topic in Apache Kafka
- Managed to list available topics using this command
bin/kafka-topics.sh --list --zookeeper localhost:2181
- I have taken the Kafka word count code from here
and the code is
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
- I executed the code using the command
./spark-submit /root/girish/python/kafkawordcount.py localhost:2181
and I got this error
Traceback (most recent call last):
File "/root/girish/python/kafkawordcount.py", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-", line 72, in createStream
raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o23.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
- I have updated the execution code using the answer from this question
spark submit failed with spark streaming workdcount python code
./spark-submit --jars /root/spark-,/usr/hdp/,/usr/hdp/,/usr/hdp/ /root/girish/python/kafkawordcount.py localhost:2181 <topic name>
Now I am getting this error
File "/root/girish/python/kafkawordcount.py", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-", line 67, in createStream
jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
File "/root/spark-", line 529, in __call__
File "/root/spark-", line 265, in get_command_part
AttributeError: 'dict' object has no attribute '_get_object_id'
Please help to solve this issue.
Thanks in advance
PS: I am using Apache Spark 1.2