如上所述,我目前正在设置Kafka Connect Sink,以便将数据从Kafka汇入Google云端存储。
然而,一切都很顺利 - 它只使用最新的可用偏移量。也就是说,一旦它开始运行,它只会将新生成的消息汇入GCS,而不是来自Kafka的现有消息。我尝试删除kafka connect storage/offset主题,创建新的连接器名称等。但是,它始终以最新的偏移量开始。
无论如何要为Kafka Connect GCS Sink配置最早的偏移量?我还没有看到任何配置来处理这个问题
https://docs.confluent.io/current/connect/kafka-connect-gcs/configuration_options.html
要么
https://docs.confluent.io/current/connect/references/allconfigs.html
我已经尝试删除任何kafka connect topics/file存储,以及从新的连接器名称开始
我看到连接器启动后生成的Kafka Connect接收器消息。
我是expecting/need消息从最早的可用偏移量下沉,即。如果没有为连接器提交偏移量,则从最早的消息开始
分析解答
第一次创建连接器时,默认情况下将采用earliest
偏移量。您应该在Connect worker日志中看到这个:
[2019-08-05 23:31:35,405] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
…
您可以通过更改Worker配置:consumer.auto.offset.reset
来覆盖它。
删除连接器并重新创建连接器时,将保留并重用偏移量。
如果使用新名称创建连接器,则默认情况下将使用连接工作程序(earliest
)中设置的偏移量。