0
votes

I am using spark streaming to continuously read data from kafka and perform some statistics. I am streaming every second.

So I have one second batches (dstreams). Each RDD inside this dstream contains a JSON.

This is how I have my dstream:

kafkaStream = KafkaUtils.createDirectStream(stream, ['livedata'], {"metadata.broker.list": 'localhost:9092'})
raw = kafkaStream.map(lambda kafkaS: kafkaS[1])
clean = raw.map(lambda xs:json.loads(xs))

ONE OF THE RDDs in my clean dstream looks like this:

{u'epochseconds': 1458841451, u'protocol': 6, u'source_ip': u'192.168.1.124', \
u'destination_ip': u'149.154.167.120', u'datetime': u'2016-03-24 17:44:11', \
u'length': 1589, u'partitionkey': u'partitionkey', u'packetcount': 10,\
u'source_port': 43375, u'destination_port': 443}

And I have like 30-150 such RDDs in each DStream.

Now, what I'm trying to do, is, get the total sum of the 'lengths' or say 'packetcounts' in each DStream. That is,

rdd1.length + rdd2.length + ... + LastRDDInTheOneSecondBatch.length

What I tried:

add=clean.map(lambda xs: (xs['length'],1)).reduceByKey(lambda a, b: a+b)

What I got:

Frequency instead of sum.

(17, 6)
(6, 24)

What should I do to have the total sum instead of the frequency of the keys?

1

1 Answers

1
votes

That's because you are using the value of 'length' as a key, try this:

add=clean.map(lambda xs: ('Lenght',xs['length'])).reduceByKey(lambda a, b: a+b)

You have to set the same key to all pairs (key,value). the value could be the field lenght or other field to aggregate...