我在Ubuntu服务器上设置Apache Kafka并按照https://kafka.apache.org/quickstart中提到的前五个步骤进行测试,一切正常。

然后,我继续使用 kafka-python 1.4.6的installation在python中进行测试,并编写了简单的生产者和消费者脚本。

我为听众配置的 听众= PLAINTEXT://localhost:9092 <br/> advertised.listeners = PLAINTEXT://localhost:9092

这是脚本

consum.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('my-topic')
for message in consumer:
        print (message)

prod.py

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers='localhost:9092',api_version=(0, 10, 1))

producer.send('my-topic', b'Hello')

在运行脚本时,生产者脚本立即完成,并且消费者脚本不会打印任何消息

任何想法我可能会在这里失踪。<br/> 谢谢

分析解答

问题是您的生产者脚本在发送消息之前完成。运行send()后,Python到达脚本的最后一行并立即停止运行。

您需要调用flush()以强制运行时等待所有消息在终止之前发送。

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers='localhost:9092',api_version=(0, 10, 1))

producer.send('my-topic', b'Hello')

producer.flush()

除此之外你的逻辑看起来很好(并为我工作)。