我在Ubuntu服务器上设置Apache Kafka并按照https://kafka.apache.org/quickstart中提到的前五个步骤进行测试,一切正常。
然后,我继续使用 kafka-python 1.4.6的installation在python中进行测试,并编写了简单的生产者和消费者脚本。
我为听众配置的 听众= PLAINTEXT://localhost:9092 <br/> advertised.listeners = PLAINTEXT://localhost:9092
这是脚本
consum.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic')
for message in consumer:
print (message)
prod.py
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers='localhost:9092',api_version=(0, 10, 1))
producer.send('my-topic', b'Hello')
在运行脚本时,生产者脚本立即完成,并且消费者脚本不会打印任何消息
任何想法我可能会在这里失踪。<br/> 谢谢
分析解答
问题是您的生产者脚本在发送消息之前完成。运行send()
后,Python到达脚本的最后一行并立即停止运行。
您需要调用flush()
以强制运行时等待所有消息在终止之前发送。
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers='localhost:9092',api_version=(0, 10, 1))
producer.send('my-topic', b'Hello')
producer.flush()
除此之外你的逻辑看起来很好(并为我工作)。