I am using spark streaming to continuously read data from kafka and perform some statistics. I am streaming every second.
So I have one second batches (dstreams). Each RDD inside this dstream contains a JSON.
This is how I have my dstream:
kafkaStream = KafkaUtils.createDirectStream(stream, ['livedata'], {"metadata.broker.list": 'localhost:9092'})
raw = kafkaStream.map(lambda kafkaS: kafkaS[1])
clean = raw.map(lambda xs:json.loads(xs))
ONE OF THE RDDs in my clean dstream looks like this:
{u'epochseconds': 1458841451, u'protocol': 6, u'source_ip': u'192.168.1.124', \
u'destination_ip': u'149.154.167.120', u'datetime': u'2016-03-24 17:44:11', \
u'length': 1589, u'partitionkey': u'partitionkey', u'packetcount': 10,\
u'source_port': 43375, u'destination_port': 443}
And I have like 30-150 such RDDs in each DStream.
Now, what I'm trying to do, is, get the total sum of the 'lengths' or say 'packetcounts' in each DStream. That is,
rdd1.length + rdd2.length + ... + LastRDDInTheOneSecondBatch.length
What I tried:
add=clean.map(lambda xs: (xs['length'],1)).reduceByKey(lambda a, b: a+b)
What I got:
Frequency instead of sum.
(17, 6)
(6, 24)
What should I do to have the total sum instead of the frequency of the keys?