如上所述,我目前正在设置Kafka Connect Sink,以便将数据从Kafka汇入Google云端存储。

然而,一切都很顺利 - 它只使用最新的可用偏移量。也就是说,一旦它开始运行,它只会将新生成的消息汇入GCS,而不是来自Kafka的现有消息。我尝试删除kafka connect storage/offset主题,创建新的连接器名称等。但是,它始终以最新的偏移量开始。

无论如何要为Kafka Connect GCS Sink配置最早的偏移量?我还没有看到任何配置来处理这个问题

https://docs.confluent.io/current/connect/kafka-connect-gcs/configuration_options.html

要么

https://docs.confluent.io/current/connect/references/allconfigs.html

我已经尝试删除任何kafka connect topics/file存储,以及从新的连接器名称开始

我看到连接器启动后生成的Kafka Connect接收器消息。

我是expecting/need消息从最早的可用偏移量下沉,即。如果没有为连接器提交偏移量,则从最早的消息开始

分析解答

第一次创建连接器时,默认情况下将采用earliest偏移量。您应该在Connect worker日志中看到这个:

[2019-08-05 23:31:35,405] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
…

您可以通过更改Worker配置:consumer.auto.offset.reset来覆盖它。

删除连接器并重新创建连接器时,将保留并重用偏移量。

如果使用新名称创建连接器,则默认情况下将使用连接工作程序(earliest)中设置的偏移量。