1
votes

I'm new to Kafka streaming. I setup a twitter listener using python and it is running in the localhost:9092 kafka server. I could consume the stream produced by the listener using a kafka client tool (conduktor) and also using the command "bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic twitter --from-beginning" BUt when i try to consume the same stream using Spark Structured streaming, it is not capturing and throws the error - Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".; Find the screenshot below

  1. Command output - Consumes Data
  2. Jupyter output for spark consumer - Doesn't consume data

My Producer or listener code:

auth = tweepy.OAuthHandler("**********", "*************")
auth.set_access_token("*************", "***********************")
# session.set('request_token', auth.request_token)
api = tweepy.API(auth)
class KafkaPushListener(StreamListener):          
    def __init__(self):
        #localhost:9092 = Default Zookeeper Producer Host and Port Adresses
        self.client = pykafka.KafkaClient("0.0.0.0:9092")

    #Get Producer that has topic name is Twitter
        self.producer = self.client.topics[bytes("twitter", "ascii")].get_producer()

    def on_data(self, data):
        #Producer produces data for consumer
        #Data comes from Twitter
        self.producer.produce(bytes(data, "ascii"))
        return True

    def on_error(self, status):
        print(status)
        return True
twitter_stream = Stream(auth, KafkaPushListener())
twitter_stream.filter(track=['#fashion'])

Consumer access from Spark Structured streaming

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "twitter") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
2
can you post full code ?? & I don't see any sink from the screen shot .. what sink are you using ??Srinivas
Updated with codeSri-nidhi

2 Answers

1
votes

Found what was missing, when I submitted the spark-job, I had to include the right dependency package version. I have spark 3.0.0 Therefore, I included - org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 package

0
votes

Add sink It will start consum data from kafka.

Check below code.

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "twitter") \
  .load()

query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .outputMode("append") \
    .format("console") \ # here I am using console format .. you may change as per your requirement.
    .start()

query.awaitTermination()