0
votes

I have set up Kafka and Spark on Ubuntu. I am trying to read kafka topics through Spark Streaming using pyspark(Jupyter notebook). Spark is neither reading the data nor throwing any error.

Kafka producer and consumer are communicating with each other on terminal. Kafka is configured with 3 partitions on port 9092,9093,9094. Messages are getting stored in kafka topics. Now, I want to read it through Spark Streaming. I am not sure what I am missing. Even I have explored it on internet, but couldnt find any working solution. Please help me to understand the missing part.

  • Topic Name: new_topic
  • Spark - 2.3.2
  • Kafka - 2.11-2.1.0
  • Python 3
  • Java- 1.8.0_201
  • Zookeeper port : 2181

Kafka Producer : bin/kafka-console-producer.sh --broker-list localhost:9092 --topic new_topic

Kafka Consumer: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic new_topic --from-beginning

Pyspark Code (Jupyter Notebook):

#!/usr/bin/env python
# coding: utf-8
import os

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages 
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.2 pyspark-shell'

import findspark
findspark.init('/home/shekhar/spark-2.3.2-bin-hadoop2.7')
import pyspark
import sys

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

from uuid import uuid1

if __name__=="__main__":
    #sconf = SparkConf().setAppName("SparkStr").setMaster("local")
    sc = SparkContext(appName="SparkStreamingReceiverKafkaWordCount")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc,2)
    broker,topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc,"localhost:9092","raw-event- 
    streaming-consumer",{topic:1})
    lines = kvs.map(lambda x: x[1])
    lines.pprint()
    ssc.start()
    ssc.awaitTermination()

Output that is displaying in Jypyter notebook:

-------------------------------------------
Time: 2019-01-30 00:52:18
-------------------------------------------

-------------------------------------------
Time: 2019-01-30 00:52:20
-------------------------------------------

Spark-submit Command:

bin/spark-submit 
  --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.2 SparkKafka-Copy1.py localhost:9092 new_topic 
  --master spark://localhost:4040

The output of spark-submit on terminal is given below:

Ivy Default Cache set to: /home/shekhar/.ivy2/cache
The jars for the packages stored in: /home/shekhar/.ivy2/jars
:: loading settings :: url = jar:file:/home/shekhar/spark-2.3.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0698f154-2d3f-4d56-b2c5-099190b947df;1.0
    confs: [default]
    found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.3.2 in central
    found org.apache.kafka#kafka_2.11;0.8.2.1 in central
    found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
    found com.yammer.metrics#metrics-core;2.2.0 in central
    found org.slf4j#slf4j-api;1.7.16 in central
    found org.scala-lang.modules#scala-parser-combinators_2.11;1.0.4 in central
    found com.101tec#zkclient;0.3 in central
    found log4j#log4j;1.2.17 in central
    found org.apache.kafka#kafka-clients;0.8.2.1 in central
    found net.jpountz.lz4#lz4;1.2.0 in central
    found org.xerial.snappy#snappy-java;1.1.2.6 in central
    found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 617ms :: artifacts dl 19ms
    :: modules in use:
    com.101tec#zkclient;0.3 from central in [default]
    com.yammer.metrics#metrics-core;2.2.0 from central in [default]
    log4j#log4j;1.2.17 from central in [default]
    net.jpountz.lz4#lz4;1.2.0 from central in [default]
    org.apache.kafka#kafka-clients;0.8.2.1 from central in [default]
    org.apache.kafka#kafka_2.11;0.8.2.1 from central in [default]
    org.apache.spark#spark-streaming-kafka-0-8_2.11;2.3.2 from central in [default]
    org.scala-lang.modules#scala-parser-combinators_2.11;1.0.4 from central in [default]
    org.scala-lang.modules#scala-xml_2.11;1.0.2 from central in [default]
    org.slf4j#slf4j-api;1.7.16 from central in [default]
    org.spark-project.spark#unused;1.0.0 from central in [default]
    org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   12  |   0   |   0   |   0   ||   12  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0698f154-2d3f-4d56-b2c5-099190b947df
    confs: [default]
    0 artifacts copied, 12 already retrieved (0kB/25ms)
2019-01-30 18:40:19 WARN  Utils:66 - Your hostname, shekhar-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
2019-01-30 18:40:19 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2019-01-30 18:40:19 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:100)
    at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 12 more
2019-01-30 18:40:19 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-01-30 18:40:19 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-e6d0532c-3593-4c28-8bb6-6d48aedb12f3
1

1 Answers

0
votes

It is resolved now. I had to set up the PYTHONPATH and export it in the path in .bashrc file.

PYTHONPATH=/usr/bin/python3
export PATH=$PATH:$PYTHONPATH/bin

Along with that in the main function under createstream, zookeeper port was changed to 2181, which was wrongly given as 9092.