当前位置:首页 > 服务端 > spring-kafka

spring-kafka

2022年11月05日 23:17:27服务端18

spring-kafka

使用spring-kafka的小伙伴,看过来。

说明

因为spring-kafka封装的比较厉害,可能跟你实际使用起来有很大的差别。

一个简单的消费例子

spring-boot基础上添加依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.3.RELEASE</version>
</dependency>

注意要检查下依赖的kakfa-clients,是否与你服务端的匹配。

不要忘记了通过注解@EnableKafka开启自动配置。

采用默认的配置:

spring:
  kafka:
    consumer:
      bootstrap-servers:
        - 127.0.0.1:9092
      # 消费组
      group-id: myGroup
      # 消费者是否自动提交偏移量,默认为true
      enable-auto-commit: false
      # 消费者在读取一个没有偏移量或者偏移量无效的情况下,从起始位置读取partition的记录,默认是latest
      auto-offset-reset: earliest
      # 单次调用poll方法能够返回的消息数量
      max-poll-records: 50

然后写个测试用例试试:

package tk.fishfish.easyjava.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * 消费者
 *
 * @author 奔波儿灞
 * @since 1.0
 */
@SpringBootTest
@RunWith(SpringRunner.class)
public class ConsumerTest {

    private static final Logger LOG = LoggerFactory.getLogger(ConsumerTest.class);

    @KafkaListener(topics = "test")
    public void onMessage(ConsumerRecord<String, String> record) {
        LOG.info("record: {}", record);
        String value = record.value();
        if (value.length() % 2 == 0) {
            throw new RuntimeException("模拟业务出错");
        }
    }

    @Test
    public void run() {
        try {
            // 阻塞5分钟,方便调试
            Thread.sleep(5 * 60 * 1000);
        } catch (InterruptedException e) {
            LOG.warn("sleep error", e);
        }
    }

}

上面通过@KafkaListener来监听topic,处理消息。

为了模拟业务会出现一些异常,我特意在判断value长度为偶数的情况下抛出异常,看在默认配置的情况下,如果业务出错,是否仍会提交offsets

结果发现,仍提交了offsets

spring-kafka _ JavaClub全栈架构师技术笔记

下面是日志:

INFO  2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] tk.fishfish.easyjava.kafka.ConsumerTest            - record: ConsumerRecord(topic = test, partition = 0, offset = 261, CreateTime = 1558059947687, serialized key size = -1, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = )
ERROR 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler             - Error while processing: ConsumerRecord(topic = test, partition = 0, offset = 261, CreateTime = 1558059947687, serialized key size = -1, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = )
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void tk.fishfish.easyjava.kafka.ConsumerTest.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException: 模拟业务出错
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 模拟业务出错
    at tk.fishfish.easyjava.kafka.ConsumerTest.onMessage(ConsumerTest.java:29)
    at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
    ... 13 common frames omitted
DEBUG 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.a.RecordMessagingMessageListenerAdapter    - Processing [GenericMessage [payload=d, headers={kafka_offset=262, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@f46d581, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test, kafka_receivedTimestamp=1558059947851}]]
INFO  2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] tk.fishfish.easyjava.kafka.ConsumerTest            - record: ConsumerRecord(topic = test, partition = 0, offset = 262, CreateTime = 1558059947851, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d)
DEBUG 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {test-0=OffsetAndMetadata{offset=263, metadata=''}}
DEBUG 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {test-0=OffsetAndMetadata{offset=263, metadata=''}}
DEBUG 2019-05-17 10:25:48.560 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.consumer.internals.ConsumerCoordinator     - [Consumer clientId=consumer-2, groupId=myGroup] Committed offset 263 for partition test-0

从日志可以看到offset = 261的记录处理失败了,但最后仍提交了Committed offset 263 for partition test-0

总结:如果你采取我这样的配置,当处理record出错的时候,仍会提交偏移量。那么我们就需要业务处理失败的情况了。比如try...catch之后保存错误的record,然后定时重试。

出错的情况下不提交offsets

那么,能不能在出错的情况下不提交咧?

通过查看文档发现,发现可以使用Acknowledgment去确认该条record是否提交。

修改下配置,配置spring.kafka.listener.*

spring:
  kafka:
    consumer:
      bootstrap-servers:
        - 127.0.0.1:9092
      group-id: myGroup
      # 消费者是否自动提交偏移量,默认为true
      enable-auto-commit: false
      # 消费者在读取一个没有偏移量或者偏移量无效的情况下,从起始位置读取partition的记录,默认是latest
      auto-offset-reset: earliest
      # 单次调用poll方法能够返回的消息数量
      max-poll-records: 50
    listener:
      # Listener AckMode
      ack-mode: MANUAL_IMMEDIATE
      # 并发消费者
      concurrency: 1

然后代码有些调整(就不贴全了),使用Acknowledgment

@KafkaListener(topics = "test")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
    LOG.info("record: {}", record);
    String value = record.value();
    if (value.length() % 2 == 0) {
        throw new RuntimeException("模拟业务出错");
    }
    // 业务处理成功确认
    ack.acknowledge();
}

如果在业务出错的情况下,不会提交offsets,然而真的是这样的吗?

测试发现,在业务出错的情况下,确实不会提交offsets,但是只要后面的记录处理成功,就会提交offsets,这样前面的失败的数据还是需要自己去手动处理。要么重新获取该offset的数据,要么记录错误record,业务重试。

总结

在错误的情况下,建议自己业务记录后,去重试;或者使用spring-kafkaErrorHandler,处理错误的情况。

作者:bener
来源链接:https://www.cnblogs.com/bener/p/10881165.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/68052.html

标签: KafkaSpring
分享给朋友:

“spring-kafka” 的相关文章

SpringBoot之整合Mybatis篇

SpringBoot之整合Mybatis篇

开头语 啪嚓,醒木一拍(咳咳) ,上回书我们说到:SpringBoot的简单搭建,也就是SpringBoot的搭建流程,简单的将项目进行启动,本次,我们简单的把SpringBoot与持久层框架Mybatis进行整合,接下来,我们开始。 Mybatis名词解释   &...

SpringBoot 如何进行参数校验

SpringBoot 如何进行参数校验

为什么需要参数校验在日常的接口开发中,为了防止非法参数对业务造成影响,经常需要对接口的参数进行校验,例如登录的时候需要校验用户名和密码是否为空,添加用户的时候校验用户邮箱地址、手机号码格式是否正确。 靠代码对接口参数一个个校验的话就太繁琐了,代码可读性极差。 Validator框架就是为了解决开发人...

Springboot 整合 Mybatis 的完整 Web 案例

Springboot 整合 Mybatis 的完整 Web 案例

摘要: 原创出处:www.bysocket.com 泥瓦匠BYSocket 希望转载,保留摘要,谢谢! 推荐一本书《 腾讯传》。 新年第一篇 Springboot 技术文...

Spring Boot 2.0 WebFlux 快速入门实践

Spring Boot 2.0 WebFlux 快速入门实践

02:WebFlux 快速入门实践 Spring Boot 2.0 spring.io 官网有句醒目的话是: BUILD ANYTHING WITH SPRING BOOT Spring Boot (Boot 顾名思义,是...

Springboot 整合 Dubbo/ZooKeeper 详解 SOA 案例

Springboot 整合 Dubbo/ZooKeeper 详解 SOA 案例

摘要: 原创出处:www.bysocket.com 泥瓦匠BYSocket 希望转载,保留摘要,谢谢!...

Spring Boot 2.x 系列教程:WebFlux 系列教程大纲(一)

Spring Boot 2.x 系列教程:WebFlux 系列教程大纲(一)

摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! WebFlux 系列教程大纲 一、背景 大家都知道,Spring Framework 是 Java/Spring 应用程序跨...

SpringBoot(三) -- SpringBoot与日志

SpringBoot(三) -- SpringBoot与日志

一.日志的起源  现在假设一个开发人员在开发一个大型系统,由于这个系统过于庞大没在很多的地方将关键的数据使用System.out.println()打印,但是当我们在项目正式上线时又需要去除,在项目bug修复时又需要重新打印,那么我们是不是可以将这些实用的输出信息保存到一个文件中.我们...

Spring Boot 和 Spring Cloud 应用内存如何管理?

Spring Boot 和 Spring Cloud 应用内存如何管理?

在整体应用架构中,非生产环境情况下,一般 1GB 或者 2GB 的 RAM 就足够了。如果我们将这个应用程序划分为 20 或 30 个独立的微服务,那么很难期望 RAM 仍将保持在 1GB 或 2GB 左右。特别是如果我们使用 Spring Cl...

Could not resolve matching constructor (hint: specify index/type/name arguments for simple parameter 标签: 构造器注入Spring

    问题:要么是因为构造方法改变了,要么就是构造方法入参实例化失败(比如没有实现)   问题 在练习spring构造器注入方式的小程序的时候报错: Exception in thread...

Spring Boot 03 —— 日志框架 ;

Spring Boot 03 —— 日志框架 ;

   日志框架: Spring Boot 日志的抽象层(一个规范):SLF4j(Simple  Logging Facade for Java)    日志的实现层:Logback 如何让系统中所有的日志都统...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。