1
votes

I want to setup a streaming application using Apache Kafka and Spark streaming. Kafka is running on a seperate unix machine version 0.9.0.1 and spark v1.6.1 is a part of a hadoop cluster.

I have started the zookeeper and kafka server and want to stream in messages from a log file using console producer and consumed by spark streaming application using direct method (no receivers). I have written code in python and executing using the below command:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar streamingDirectKafka.py

getting below error:

/opt/mapr/spark/spark-1.6.1/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 152, in createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling o38.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker

Could you please help?

Thanks!!

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    conf = SparkConf().setAppName("StreamingDirectKafka")
    sc = SparkContext(conf = conf)
    ssc = StreamingContext(sc, 1)

    topic = ['test']
    kafkaParams = {"metadata.broker.list": "apsrd7102:9092"}
    lines = (KafkaUtils.createDirectStream(ssc, topic, kafkaParams)
                       .map(lambda x: x[1]))
    counts = (lines.flatMap(lambda line: line.split(" "))
                   .map(lambda word: (word, 1))
                   .reduceByKey(lambda a, b: a+b))
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
1

1 Answers

0
votes

Looks like you are using incompatible version of Kafka. From the documentation as of Spark 2.0 - Kafka 0.8.x is supported.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources