我有一个Kafka消费者,它在消耗消息时会调用外部API。如果呼叫不成功,则将polly.net用作重试机制。

当前解决方案的问题在于,重试机制正在阻止下一条消息的消耗,因此下一条消息必须等待重试机制才能完成。

有什么想法如何异步运行重试机制,以便继续下一个消息?

以下示例说明了所述问题:

using Confluent.Kafka;
using Polly;
using Polly.Extensions.Http;


var config = new ConsumerConfig
{
    BootstrapServers = "host1:9092,host2:9092",
    GroupId = "foo",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    // Awaiting retry policy here will block the consumption of next message
    var result = await GetRetryPolicy().ExecuteAsync(async () =>
    {
        // CALL AN API HERE...
        return new HttpResponseMessage(System.Net.HttpStatusCode.OK);
    });
}


IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.NotFound)
        .WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
}
分析解答

为了完全实现不阻止的异步处理,您确实应该避免等待消息消耗。

这是修改的代码:

using Confluent.Kafka;
using Polly;
using Polly.Extensions.Http;

var config = new ConsumerConfig
{
    BootstrapServers = "host1:9092,host2:9092",
    GroupId = "foo",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    var retryPolicy = GetRetryPolicy();

    while (true)
    {
        var result = consumer.Consume(); // This is a synchronous call

        // Start the asynchronous execution of the retry policy
        _ = retryPolicy.ExecuteAsync(async () =>
        {
            // CALL AN API HERE asynchronously, without awaiting the response...
            await YourApiCallAsync(); // Your API call should be asynchronous

            // Continue processing or log the result as needed
        });
    }
}

IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.NotFound)
        .WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
}

在此代码中,不等待result = consumer.Consume()调用,允许Kafka消费者继续进行异步处理消息。 retryPolicy.ExecuteAsync调用还启动了您的API调用的异步执行,而无需等待响应,从而确保它不会阻止消息消耗循环。