如何检查Ruby-Kafka重试是否正常

在文档中提到生产者重试将消息发送到基于max_retries的队列。 因此,我关闭了Kafka,然后尝试了制作人。我得到这个错误 Fetching cluster metadata from kafka://localhost:9092 [topic_m... 阅读全文

ruby-on-rails ruby apache-kafka kafka-producer-api ruby-kafka

如何在Python中产生JSON格式的Kafka消息

如何删除报价并像原始格式一样发送数据 原始JSON-format是: { "@timestamp": "2020-06-02T09:38:03.183186Z" } 此数据在另一个主题中 "{\"@timestamp\": \"2020-05-25T17... 阅读全文

python apache-kafka kafka-producer-api kafka-python

如何使用Python在Kafka Consumer中聚合JSON数据

我在KAFKA Transactions主题中产生的数据如下: ConsumerRecord(topic ='Transactions',partition = 0,offset = 3,timestamp = 1591277946735,timesta... 阅读全文

python apache-kafka kafka-consumer-api kafka-producer-api kafka-python

kafka如何处理不同批次的订购

在有关重试的kafka文档中,它说: 允许重试而不将max.in.flight.requests.per.connection设置为1可能会更改记录的顺序,因为如果将两个批次发送到单个分区,并且第一个失败并被重试,但是第二个成功,则记录在第二批中可能会首... 阅读全文

apache-kafka kafka-producer-api

如何为主题创建或设置分区数

我正在使用python 3.6.8和kafka-python=2.0.2 有没有办法从python代码设置主题的分区数? 我的生产者代码如下: producer = KafkaProducer(bootstrap_servers=['localhost:... 阅读全文

python-3.x apache-kafka apache-kafka-streams kafka-producer-api

如何解决avrotypeexception:未知的联合分支

我正在使用命令kafka-avro-console-producer与Kafka CLI. I'M一起制作一个主题的记录。 这是模式: { "connect.name": "datachanges.article.Envelope", "name": "... 阅读全文

apache-kafka avro kafka-producer-api