我在KAFKA Transactions主题中产生的数据如下:
ConsumerRecord(topic ='Transactions',partition = 0,offset = 3,timestamp = 1591277946735,timestamp_type = 0,key = None,value = {'transaction_id':'9495601361','account_number':14,'transaction_reference':' 20070”,“ transaction_datetime”:“ 2020-06-04T19:09:06.735129”,“金额”:260.93},header= [],校验和=无,serialized_key_size = -1,serialized_value_size = 160,serialized_header_size = -1)
ConsumerRecord(topic ='Transactions',partition = 0,offset = 4,timestamp = 1591277946736,timestamp_type = 0,key = None,value = {'transaction_id':'4952940859','account_number':14,'transaction_reference':' 44291”,“ transaction_datetime”:“ 2020-06-04T19:09:06.736128”,“金额”:2.82},header= [],校验和=无,serialized_key_size = -1,serialized_value_size = 158,serialized_header_size = -1)
ConsumerRecord(topic ='Transactions',partition = 0,offset = 5,timestamp = 1591277946737,timestamp_type = 0,key = None,value = {'transaction_id':'0193362270','account_number':12,'transaction_reference':' 96312”,“ transaction_datetime”:“ 2020-06-04T19:09:06.736128”,“金额”:766.95},header= [],校验和=无,serialized_key_size = -1,serialized_value_size = 160,serialized_header_size = -1)
到目前为止编写的使用者代码为:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['Transactions'])
for message in consumer:
print (message)
我想要类似(account_number,sum(amount))的元组的输出,如何实现呢?
我认为字典对数据进行分组可能比元组有用。 defaultdict
非常适合此过程
from collections import defaultdict
accounts = defaultdict(int)
for message in consumer:
payload = message.value
account = payload['account_number']
amount = payload['amount']
accounts[account] += amount
print(accounts)
defaultdict(<class 'int'>,{
"14": 263.75,
"12": 766.95
})
要获取您可能正在寻找的元组,可以在循环后遍历accounts.items()
:
for info in accounts.items():
print(info)
("14", 263.75)
("12", 766.95)