Goal
My goal is to get a simple Spark Streaming example that uses the direct approach of interfacing with Kafka working, but I can't get past a specific error.
The ideal result is to have two console windows open. One that I can type sentences into and the other one that shows the "real-time" word count of all the sentences.
Console 1
the cat likes the bacon
my cat ate the bacon
Console 2
Time:..
[("the", 2), ("cat", 1), ("likes", 1), ("bacon", 1)]
Time:..
[("the", 3), ("cat", 2), ("likes", 1), ("bacon", 2), ("my", 1), ("ate", 1)]
Steps taken
Download and untar
kafka_2.10-0.8.2.0
spark-1.5.2-bin-hadoop2.6
Start ZooKeeper and Kafka servers in separate screens.
screen -S zk
bin/zookeeper-server-start.sh config/zookeeper.properties
"Ctrl-a" "d" to detach screen
screen -S kafka
bin/kafka-server-start.sh config/server.properties
"Ctrl-a" "d"
Start Kafka producer
Use a separate console window and type words into it to simulate stream.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Start Pyspark
Use the Spark streaming-Kafka package.
bin/pyspark --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2
Run simple word count
Based off of the example in the docs.
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 2)
topic = "test"
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
The error
Typing words into the Kafka producer console produces results exactly one time, but then the error below is raised one time and no further results are produced (although the "Time" sections continue to appear).
Time: 2015-11-15 18:39:52
-------------------------------------------
15/11/15 18:42:57 ERROR PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
at java.net.ServerSocket.implAccept(ServerSocket.java:530)
at java.net.ServerSocket.accept(ServerSocket.java:498)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
Traceback (most recent call last):
File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/streaming/util.py", line 62, in call
r = self.func(t, *rdds)
File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/streaming/dstream.py", line 171, in takeAndPrint
taken = rdd.take(num + 1)
File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/rdd.py", line 1299, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/context.py", line 917, in runJob
return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/usr/lib/python2.7/socket.py", line 380, in read
data = self._sock.recv(left)
error: [Errno 104] Connection reset by peer
Any help or advice would be greatly appreciated.