似乎偏移量没有承诺。

我正在使用Kafka Python包。这是我的代码

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'quickstart-events',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    auto_commit_interval_ms=1000,
)

msg_pack = consumer.poll(max_records=10,timeout_ms=500,update_offsets=True)
for tp,messages in msg_pack.items():
    for message in messages:
        print("%s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition,
                                             message.offset, message.key,
                                             message.value))

但问题是我总是从起始的所有消息。有时我没有得到任何。你能帮我么?

quickstart-events:0:52: key=None value=b'1'
quickstart-events:0:53: key=None value=b'2'
quickstart-events:0:54: key=None value=b'3'
quickstart-events:0:55: key=None value=b'4'
quickstart-events:0:56: key=None value=b'5'
quickstart-events:0:57: key=None value=b'1'
quickstart-events:0:58: key=None value=b'2'
quickstart-events:0:59: key=None value=b'3'
quickstart-events:0:60: key=None value=b'4'
quickstart-events:0:61: key=None value=b'5'
分析解答

I always get all the messages from the starting.

这应该只根据这些而发生一次

auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',

Sometimes I don't get any

那么消费者组可能没有滞后(没有记录消费)。您需要使用kafka-consumer-groups --describe --group my-group来检查滞后,或者偏移是在您期望的方式上提交的。

如果偏移量在您期望的方式上未提交,那么您应该禁用自动提交并手动执行此操作