我在KAFKA Transactions主题中产生的数据如下:

ConsumerRecord(topic ='Transactions',partition = 0,offset = 3,timestamp = 1591277946735,timestamp_type = 0,key = None,value = {'transaction_id':'9495601361','account_number':14,'transaction_reference':' 20070”,“ transaction_datetime”:“ 2020-06-04T19:09:06.735129”,“金额”:260.93},header= [],校验和=无,serialized_key_size = -1,serialized_value_size = 160,serialized_header_size = -1)

ConsumerRecord(topic ='Transactions',partition = 0,offset = 4,timestamp = 1591277946736,timestamp_type = 0,key = None,value = {'transaction_id':'4952940859','account_number':14,'transaction_reference':' 44291”,“ transaction_datetime”:“ 2020-06-04T19:09:06.736128”,“金额”:2.82},header= [],校验和=无,serialized_key_size = -1,serialized_value_size = 158,serialized_header_size = -1)

ConsumerRecord(topic ='Transactions',partition = 0,offset = 5,timestamp = 1591277946737,timestamp_type = 0,key = None,value = {'transaction_id':'0193362270','account_number':12,'transaction_reference':' 96312”,“ transaction_datetime”:“ 2020-06-04T19:09:06.736128”,“金额”:766.95},header= [],校验和=无,serialized_key_size = -1,serialized_value_size = 160,serialized_header_size = -1)

到目前为止编写的使用者代码为:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['Transactions'])
for message in consumer:
            print (message)

我想要类似(account_number,sum(amount))的元组的输出,如何实现呢?

分析解答

我认为字典对数据进行分组可能比元组有用。 defaultdict非常适合此过程

from collections import defaultdict

accounts = defaultdict(int)

for message in consumer:
    payload = message.value
    account = payload['account_number']
    amount = payload['amount']

    accounts[account] += amount


print(accounts)

defaultdict(<class 'int'>,{
  "14": 263.75,
  "12": 766.95
})

要获取您可能正在寻找的元组,可以在循环后遍历accounts.items()

for info in accounts.items():
    print(info)

("14", 263.75)
("12", 766.95)