0
votes

my data in produced in KAFKA Transactions topic is like below:

ConsumerRecord(topic='Transactions', partition=0, offset=3, timestamp=1591277946735, timestamp_type=0, key=None, value={'transaction_id': '9495601361', 'account_number': 14, 'transaction_reference': '20070', 'transaction_datetime': '2020-06-04T19:09:06.735129', 'amount': 260.93}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=160, serialized_header_size=-1)

ConsumerRecord(topic='Transactions', partition=0, offset=4, timestamp=1591277946736, timestamp_type=0, key=None, value={'transaction_id': '4952940859', 'account_number': 14, 'transaction_reference': '44291', 'transaction_datetime': '2020-06-04T19:09:06.736128', 'amount': 2.82}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=158, serialized_header_size=-1)

ConsumerRecord(topic='Transactions', partition=0, offset=5, timestamp=1591277946737, timestamp_type=0, key=None, value={'transaction_id': '0193362270', 'account_number': 12, 'transaction_reference': '96312', 'transaction_datetime': '2020-06-04T19:09:06.736128', 'amount': 766.95}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=160, serialized_header_size=-1)

The consumer code written till now is:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['Transactions'])
for message in consumer:
            print (message)

I want output like tuple of (account_number, sum(amount)), how do I achieve this?

1

1 Answers

1
votes

I think a dictionary might be a bit more useful to group data than a tuple. A defaultdict will suit this process nicely

from collections import defaultdict

accounts = defaultdict(int)

for message in consumer:
    payload = message.value
    account = payload['account_number']
    amount = payload['amount']

    accounts[account] += amount


print(accounts)

defaultdict(<class 'int'>,{
  "14": 263.75,
  "12": 766.95
})

To get the tuples you might be looking for, you can iterate over accounts.items() after your loop:

for info in accounts.items():
    print(info)

("14", 263.75)
("12", 766.95)