3
votes

Goal

My goal is to get a simple Spark Streaming example that uses the direct approach of interfacing with Kafka working, but I can't get past a specific error.

The ideal result is to have two console windows open. One that I can type sentences into and the other one that shows the "real-time" word count of all the sentences.

Console 1

the cat likes the bacon

my cat ate the bacon

Console 2

Time:..

[("the", 2), ("cat", 1), ("likes", 1), ("bacon", 1)]

Time:..

[("the", 3), ("cat", 2), ("likes", 1), ("bacon", 2), ("my", 1), ("ate", 1)]


Steps taken

Download and untar

kafka_2.10-0.8.2.0
spark-1.5.2-bin-hadoop2.6

Start ZooKeeper and Kafka servers in separate screens.

screen -S zk
bin/zookeeper-server-start.sh config/zookeeper.properties

"Ctrl-a" "d" to detach screen

screen -S kafka
bin/kafka-server-start.sh config/server.properties

"Ctrl-a" "d"

Start Kafka producer

Use a separate console window and type words into it to simulate stream.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Start Pyspark

Use the Spark streaming-Kafka package.

bin/pyspark --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2

Run simple word count

Based off of the example in the docs.

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 2)
topic = "test"
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()


The error

Typing words into the Kafka producer console produces results exactly one time, but then the error below is raised one time and no further results are produced (although the "Time" sections continue to appear).

Time: 2015-11-15 18:39:52
-------------------------------------------

15/11/15 18:42:57 ERROR PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
        at java.net.ServerSocket.implAccept(ServerSocket.java:530)
        at java.net.ServerSocket.accept(ServerSocket.java:498)
        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
Traceback (most recent call last):
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/streaming/util.py", line 62, in call
    r = self.func(t, *rdds)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/streaming/dstream.py", line 171, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/rdd.py", line 1299, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/context.py", line 917, in runJob
    return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/usr/lib/python2.7/socket.py", line 380, in read
    data = self._sock.recv(left)
error: [Errno 104] Connection reset by peer

Any help or advice would be greatly appreciated.

2

2 Answers

0
votes

Try run: spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.1 your_python_file_name.py You can set others arguments (--deploy-mode, etc)

0
votes

After creating the DSstreams RDDs we should iterate the RDD by using of foreachRDD.

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 2)
ssc = StreamingContext(sc, 2)
topic = "test"
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
kvs.foreachRDD(handler)
def handler(message):
    records = message.collect()
    for record in records:
         <Data processing whatever you want >