我有一个Kafka消费者,它在消耗消息时会调用外部API。如果呼叫不成功,则将polly.net用作重试机制。
当前解决方案的问题在于,重试机制正在阻止下一条消息的消耗,因此下一条消息必须等待重试机制才能完成。
有什么想法如何异步运行重试机制,以便继续下一个消息?
以下示例说明了所述问题:
using Confluent.Kafka;
using Polly;
using Polly.Extensions.Http;
var config = new ConsumerConfig
{
BootstrapServers = "host1:9092,host2:9092",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// Awaiting retry policy here will block the consumption of next message
var result = await GetRetryPolicy().ExecuteAsync(async () =>
{
// CALL AN API HERE...
return new HttpResponseMessage(System.Net.HttpStatusCode.OK);
});
}
IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.NotFound)
.WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
}
分析解答
为了完全实现不阻止的异步处理,您确实应该避免等待消息消耗。
这是修改的代码:
using Confluent.Kafka;
using Polly;
using Polly.Extensions.Http;
var config = new ConsumerConfig
{
BootstrapServers = "host1:9092,host2:9092",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
var retryPolicy = GetRetryPolicy();
while (true)
{
var result = consumer.Consume(); // This is a synchronous call
// Start the asynchronous execution of the retry policy
_ = retryPolicy.ExecuteAsync(async () =>
{
// CALL AN API HERE asynchronously, without awaiting the response...
await YourApiCallAsync(); // Your API call should be asynchronous
// Continue processing or log the result as needed
});
}
}
IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.NotFound)
.WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
}
在此代码中,不等待result = consumer.Consume()
调用,允许Kafka消费者继续进行异步处理消息。 retryPolicy.ExecuteAsync
调用还启动了您的API调用的异步执行,而无需等待响应,从而确保它不会阻止消息消耗循环。