0
votes

I am reading the twitter stream from my Kafka topic while converting it to JSON in Pyspark code, data get missing.

Providing code below

The code is reading the twitter stream from Kafka topic and converting to JSON format. While accessing tweet['user'] getting a key error (Indices must be an integer) on tweet[0] getting the first character of the message.

from __future__ import print_function

import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
        sys.exit(-1)

    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers,topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: json.loads(x[1]))
    status=lines.map(lambda tweets: tweets['user']['screen_name'])
    #status.pprint()
    status.pprint()
    #status.map(lambda tweet: tweet['created_at']).pprint()
    #counts = lines.flatMap(lambda line: line.split(" ")) \
    #    .filter(lambda word: word.lower().startswith('#')) \
    #    .map(lambda word: (word.lower(), 1)) \
    #    .reduceByKey(lambda a, b: a+b)
    #counts.pprint()

    ssc.start()
    ssc.awaitTermination()

Getting this output after converting Kafka message to JSON

{u'quote_count': 0, u'contributors': None, u'truncated': False, u'text': u'RT @hotteaclout: @TeenChoiceFOX my #TeenChoice vote for #ChoiceActionMovieActor is Chris Evans', u'is_quote_status': False, u'in_reply_to_status_id': None, u'reply_count': 0, u'id': 1149313606304976896, .....} ...

Actual Message is

{"created_at":"Thu Jul 11 13:44:55 +0000 2019","id":1149313623363338241,"id_str":"1149313623363338241","text":"RT @alisonpool_: Legit thought this was Mike Wazowski for a second LMFAO https://t.co/DMzMtOfW2I","source":"\u003ca href=\"http://twitter.com/download/iphone\" ....}

1

1 Answers

0
votes

Ok, I solved it, It was a problem with encoding. Just

json.loads(tweets.encode('utf-8'))

Would not work, We need to specify an encoding of the file so that all the file it calls will apply the same encoding.

import sys 
reload(sys)
sys.setdefaultencoding('utf-8')

Add above code in it.