I am reading the twitter stream from my Kafka topic while converting it to JSON in Pyspark code, data get missing.
Providing code below
The code is reading the twitter stream from Kafka topic and converting to JSON format. While accessing tweet['user'] getting a key error (Indices must be an integer) on tweet[0] getting the first character of the message.
from __future__ import print_function
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
sys.exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers,topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: json.loads(x[1]))
status=lines.map(lambda tweets: tweets['user']['screen_name'])
#status.pprint()
status.pprint()
#status.map(lambda tweet: tweet['created_at']).pprint()
#counts = lines.flatMap(lambda line: line.split(" ")) \
# .filter(lambda word: word.lower().startswith('#')) \
# .map(lambda word: (word.lower(), 1)) \
# .reduceByKey(lambda a, b: a+b)
#counts.pprint()
ssc.start()
ssc.awaitTermination()
Getting this output after converting Kafka message to JSON
{u'quote_count': 0, u'contributors': None, u'truncated': False, u'text': u'RT @hotteaclout: @TeenChoiceFOX my #TeenChoice vote for #ChoiceActionMovieActor is Chris Evans', u'is_quote_status': False, u'in_reply_to_status_id': None, u'reply_count': 0, u'id': 1149313606304976896, .....} ...
Actual Message is
{"created_at":"Thu Jul 11 13:44:55 +0000 2019","id":1149313623363338241,"id_str":"1149313623363338241","text":"RT @alisonpool_: Legit thought this was Mike Wazowski for a second LMFAO https://t.co/DMzMtOfW2I","source":"\u003ca href=\"http://twitter.com/download/iphone\" ....}