导读 | 最近业务上用到了Spring Kafka,所以系统性的探索了下Spring Kafka的各种用法,发现了很多实用的特性,下面介绍下Spring Kafka的消息重试机制。 |
哈喽,大家好,我是指北君。
最近业务上用到了Spring Kafka,所以系统性的探索了下Spring Kafka的各种用法,发现了很多实用的特性,下面介绍下Spring Kafka的消息重试机制。
原生 Kafka 是不支持消息重试的。但是 Spring Kafka 2.7+ 封装了 Retry Topic 这个功能。
使用注解的方式启用 Retry Topic,在 @KafkaListener 方法上添加 @RetryableTopic 即可:
@Slf4j @Component public class DemoConsumer { @RetryableTopic @KafkaListener(topics = "topic1", groupId = "group1") public void onMsg(ConsumerRecordrecord){ log.info("topic: {}", record.topic()); throw new RuntimeException("kafka exception"); } }
这样就开启了 Spring Kafka 的消息重试机制:默认重试 3 次,间隔为 1 秒。
我们在方法里模拟了抛出异常,运行后可以发现打印了 3 条日志,间隔时间大约为 1 秒,重试的topic为原topic加上后缀“-retry”
2022-11-12 12:14:10.230 INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1 2022-11-12 12:14:11.315 INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-0 2022-11-12 12:14:12.310 INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-1
如果 3 次重试后依旧失败,会将消息发送到 DLT,
默认情况,消息被发送到死信队列后,会输出一条日志。
2022-11-12 12:14:13.324 INFO 1023 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer : Received message in dlt listener: topic1-dlt@233
DLT的topic为原topic加上后缀“-dlt”
我们可以使用@DltHandler注解来定义进入死信队列后的操作:
@DltHandler public void dltHandler(ConsumerRecordrecord){ log.info("topic:{}, key:{}, value:{}", record.topic(), record.key(), record.value()); }
可以自定义重试次数、延迟时间、topic命名策略等等,支持使用 Spring EL 表达式读取配置。
@Slf4j @Component public class DemoConsumer { @RetryableTopic( attempts = "4", backoff = @Backoff(delay = "5000", multiplier = "2"), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC ) @KafkaListener(topics = "topic2", groupId = "group1") public void onMsg2(ConsumerRecordrecord){ log.info("topic: {}", record.topic()); throw new RuntimeException("kafka exception"); } }
注解属性说明:
attempts:重试次数,默认为3。
@Backoff delay:消费延迟时间,单位为毫秒。
@Backoff multiplier:延迟时间系数,此例中 attempts = 4, delay = 5000, multiplier = 2 ,则间隔时间依次为5s、10s、20s、40s,最大延迟时间受 maxDelay 限制。
fixedDelayTopicStrategy:可选策略包括:SINGLE_TOPIC 、MULTIPLE_TOPICS
以上介绍的是注解的方式,只对注解下的方法有效。如果想让多个方法都用相同的消息重试配置,那么可以使用配置类方式:
@Bean public RetryTopicConfiguration retryTopic(KafkaTemplatetemplate){ return RetryTopicConfigurationBuilder .newInstance() .maxAttempts(4) .fixedBackOff(5000) .includeTopic("topic1") .create(template); }
以上就是Spring Kafka消息重试机制的简单应用~希望能够帮助那些正在使用Spring Kafka或即将使用的人少走一些弯路、少踩一点坑。
原文来自:
本文地址://q13zd.cn/retry-kafka-spring.html编辑:清蒸github,审核员:逄增宝
Linux大全:
Linux系统大全: