Kafka如何记住消费者,以便它可以继续消耗同一分区

假设我们有一个主题有两个分区P1和P2,并且有两个消费者C1和C2(它们在同一消费者组中),因此C1和C2分别消耗了P1和P2。 当客户端应用程序实例(c1)即将进行下一次民意调查时,Kafka如何知道该轮询请求来自C1,因此Kafka可以让C1继续消耗... 阅读全文

apache-kafka

在我的Go Build成功时,如何修复Docker构建失败 Dockerfile包括Go Mod下载

我在一个非常简单的消费者中使用"github.com/confluentinc/confluent-kafka-go/kafka"。这几乎是Confluent作为kafka/go教程的内容。 "go build ."和"go run ."成功,"dock... 阅读全文

docker go apache-kafka build

在此示例中,如何捕获zookeeper-service的IP Adress,并将其正确设置到需要它的kafka-broker中

我遵循了该网站如何使用Kubernetes (DZone)部署Apache Kafka的指令,以其Zookeeper部署Kafka经纪人。 (我只更改了名称名称)。 apiVersion: v1 kind: Service metadata: label... 阅读全文

kubernetes apache-kafka apache-zookeeper

如何使用Contruent Python从Kafka中的特定分区中消耗数据

我尝试了此代码,但也从其他分区中消耗了 ConsumerKafka.subscribe(topic_list) ConsumerKafka.assign([TopicPartition(topic_name, 1, 2)]) 阅读全文

apache-kafka kafka-consumer-api confluent-kafka-python

如何在Spring Integration Kafka消息中设置IDheader

我有一个demo弹簧集成项目,该项目正在接收Kafka消息,汇总它们,然后发布它们。我正在尝试将JdbcMessageStore添加到该项目中。问题在于它失败了错误: Caused by: java.lang.IllegalArgumentExcepti... 阅读全文

java apache-kafka spring-integration

如何使用Debezium捕获批处理事件并将其发送到Kafka

我正在使用Spring Boot和Kafka在MySQL和MongoDB之间构建bi-directional数据复制应用程序。我添加了所需的DTO,服务,存储库(使用JPA),文档类,实体类等。我正在使用Outbox模式方法(https://debezi... 阅读全文

mysql apache-kafka debezium

在Kafkalistener失败之前,如何更改DELIVERY_ATTEMPT的数量

可以说我有以下听众: @KafkaListener(topics = "${some_topic}", autoStartup = "true") public void listenForMessage(String message) { log.wa... 阅读全文

java spring spring-boot apache-kafka spring-kafka

州共享体系结构 - 如何启用滚动政策

方案:Kafka流提供了事件流,其中每个事件都是在汇总1000个记录(滚动策略)后必须处理的记录。为了提供背景,每个记录为特定的手机用户提供了移动网络的SMS使用详细信息。每个记录的处理将映射one-on-one下游记录(用于使用计费,税收计算,使用有效... 阅读全文

algorithm apache-kafka architecture

如何在不阻止kafka消息消耗的情况下运行polly.net

我有一个Kafka消费者,它在消耗消息时会调用外部API。如果呼叫不成功,则将polly.net用作重试机制。 当前解决方案的问题在于,重试机制正在阻止下一条消息的消耗,因此下一条消息必须等待重试机制才能完成。 有什么想法如何异步运行重试机制,以便继续下... 阅读全文

c# .net apache-kafka kafka-consumer-api polly

如何从Spark Scala中的Avro消息中提取架构ID

我有带有列的Spark Scala DataFrame包含AVRO消息的值(数组[BYTE] ). I知道0字节是魔术字节,并且包含位置1-4中的字节是架构ID。 如何提取这些字节(1-4)并在INT中添加带有架构ID值的新列? 需要在Spark Sca... 阅读全文

apache-spark apache-kafka avro spark-structured-streaming spark-avro