当前位置:首页 > 服务端 > Springboot2.0整合Kafka,从Kafka并发、批量获取数据

Springboot2.0整合Kafka,从Kafka并发、批量获取数据

2022年09月17日 15:45:36服务端4

Springboot2.0整合Kafka,从Kafka并发、批量获取数据

 

Kafka安装

Kafka是由Apache软件基金会开发的一个开源流处理平台,是一种高吞吐量的分布式发布订阅消息系统。
主要包含几个组件:

  • Topic:消息主题,特定消息的发布接口,每个Topic都可以分成数个Partition,用于消息的并发发送。
  • Producer:生产者,信息的发布者,发布者可以指定数个Partition进行发布。
  • Consumer:消费者,信息的使用者,同一个Group的消费者数量,最好不好超过Partition的数量,对于分区的Topic,消费者使用时需要指定相应的分区号。
  • Broker:服务代理
    ##下载kafka

SpringBoot整合kafka

当前SpringBoot版本为2.0.2.RELEASE,打包工具为Maven

消费者

a. 引入Pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kafkatest</groupId>
    <artifactId>producer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>kafka-producer</name>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/>
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <joda-time.version>2.3</joda-time.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

b.JAVA代码

@Service
public class KafkaProducerTest {
    @Autowired
    private KafkaTemplate<String,byte[]> kafkaTemplate;
    private final String topic = "byteArray_topic1";

    public void sendMessage(int key,String value){
        ProducerRecord<String,byte[]> record = new ProducerRecord<>(topic,
                key%3,String.valueOf(key),value.getBytes());
        kafkaTemplate.send(record);
    }
}

配置文件(YML)

spring:
  kafka:
      producer:
        bootstrap-servers: 172.169.0.109:9092
        batch-size: 16384
        retries: 0
        buffer-memory: 33554432
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

这里有一个非常陷阱的问题需要特别注意:序列化类的路径是:org.apache.kafka.common.serialization.StringSerializer
而不是
org.apache.kafka.config.serialization.StringSerializer
否则会出现如下错误:

2019-01-31 11:35:14.794 [main] WARN  o.s.c.a.AnnotationConfigApplicationContext -
				Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaProducerTest': Unsatisfied dependency expressed through field 'kafkaTemplate'; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration': Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.boot.context.properties.ConfigurationPropertiesBindException: Error creating bean with name 'spring.kafka-org.springframework.boot.autoconfigure.kafka.KafkaProperties': Could not bind properties to 'KafkaProperties' : prefix=spring.kafka, ignoreInvalidFields=false, ignoreUnknownFields=true; nested exception is org.springframework.boot.context.properties.bind.BindException: Failed to bind properties under 'spring.kafka.producer.key-serializer' to java.lang.Class<?>
2019-01-31 11:35:14.810 [main] ERROR o.s.b.d.LoggingFailureAnalysisReporter -
				

***************************
APPLICATION FAILED TO START
***************************

Description:

Failed to bind properties under 'spring.kafka.producer.key-serializer' to java.lang.Class<?>:

    Property: spring.kafka.producer.key-serializer
    Value: org.apache.kafka.config.serialization.StringSerializer
    Origin: class path resource [application.yml]:8:25
    Reason: No converter found capable of converting from type [java.lang.String] to type [java.lang.Class<?>]

Action:

Update your application's configuration

消费者

如果不使用并发获取、批量获取消费者的代码非常简单。

a.Pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kafkatest</groupId>
    <artifactId>consumer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>kafka-consumer</name>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <joda-time.version>2.3</joda-time.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

b1.Java代码(无并发访问、无批量获取)

@Service
@Slf4j
public class Listener {
    private final String topic = "byteArray_topic1";

    public void listen(ConsumerRecord<String, byte[]> record){
        log.info("kafka的key: " + record.key());
        log.info("kafka的value: " + new String(record.value()));
    }
}

b2.配置文件

spring:
  kafka:
    consumer:
      enable-auto-commit: true
      group-id: gridMonitorGroup
      auto-commit-interval: 1000
      auto-offset-reset: latest
      bootstrap-servers: "172.169.0.109:9092"
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

c.Java代码(并发、批量获取)

  1. Kafka消费者配置类
    批量获取关键代码:
    ①factory.setBatchListener(true);
    ②propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
    并发获取关键代码:
    factory.setConcurrency(concurrency);
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.bootstrap-servers}")
    private String servers;
    @Value("${kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group-id}")
    private String groupId;
    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, byte[]> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //最多批量获取50个
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
        return propsMap;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }
}
  1. Kafka消费者Listener
@Service
@Slf4j
public class Listener {
    private final String topic = "byteArray_topic1";


    @KafkaListener(id="myListener",
            topicPartitions ={@TopicPartition(topic = topic, partitions = { "0", "1" ,"2"})})
    public void listen(List<ConsumerRecord<String, byte[]>> recordList) {
        recordList.forEach((record)->{
            log.info("kafka的key: " + record.key());
            log.info("kafka的value: " + new String(record.value()));
        });
    }
}
  1. 配置文件
kafka:
 consumer:
   enable-auto-commit: true
   group-id: gridMonitorGroup
   auto-commit-interval: 1000
   auto-offset-reset: latest
   bootstrap-servers: "172.169.0.109:9092"
   key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
   value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
   concurrency: 3

作者:悟V-SpHeNIC
来源链接:https://blog.csdn.net/jasonhongcn/article/details/103128895

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/42666.html

标签: Kafka
分享给朋友:

“Springboot2.0整合Kafka,从Kafka并发、批量获取数据” 的相关文章

kafka消息中间件-快速学习

为什么需要消息队列   周末无聊刷着手机,某宝网APP突然蹦出来一条消息“为了回馈老客户,女朋友买一送一,活动仅限今天!”。买一送一还有这种好事,那我可不能错过!忍不住立马点了去。于是选了两个最新款,下单、支付一气呵成!满足的躺在床上,想着马上有女朋友了,竟然幸福的失眠了…...

【kafka】安装部署kafka集群(kafka版本:kafka_2.12-2.3.0)

3.2.1 下载kafka并安装kafka_2.12-2.3.0.tgz tar -zxvf kafka_2.12-2.3.0.tgz 3.2.2 配置kafka集群 在config/server.properties中修改参数: [had...

在CentOS 7上安装Kafka

简介 Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、S...

kafka的基本概念和工作流程分析

kafka的基本概念和工作流程分析

为什么需要消息队列   周末无聊刷着手机,某宝网APP突然蹦出来一条消息“为了回馈老客户,女朋友买一送一,活动仅限今天!”。买一送一还有这种好事,那我可不能错过!忍不住立马点了去。于是选了两个最新款,下单、支付一气呵成!满足的躺在床上,想着马上有女朋友了,竟然幸福的失眠了…...

Linux下Kafka下载与安装教程

Linux下Kafka下载与安装教程

原文链接:http://www.studyshare.cn/software/details/1176/0 一、预备环境 Kafka是java生态圈中的一员,运行在java虚拟机上,按Kafka官方说明,java环境推荐Java8;Kafka需要Zookeeper保存集群的...

Linux安装新版Kafka3.0

Linux安装新版Kafka3.0

最近开始玩Kafka了,想着装一下新版本的玩玩,然后网上找Kafka3.0的安装教程,发现安装Kafka3.0的倒是有,但是zookeeper还是单独安装的,这就不满足我的需求了,我就是单纯的想玩玩Kafka,我还得再去另外安装zookeepe...

kafka集群原理介绍

kafka集群原理介绍 @(博客文章)[kafka|大数据] 目录 kafka集群原理介绍 (一)基础理论 二、配置文件 三、错误处理 本系统文章共三篇,分别为 1、ka...

Kafka 快速入门(安装)

Kafka 快速入门(安装)

kafka学习目录:kafka目录 二、Kafka 快速入门 2.1、windows版安装 2.1.1、Quick Start 本次安装学习在Windows操作系统进行。(Linux版本的差别不大,运行脚本文件后缀从bat...

Docker 安装 kafka

Docker 安装 kafka

简单安装为了集成 SpringBoot,真实使用,增加增加更多配置,比如将log映射出来 1.安装 zookeeper [root@centos-linux ~]# docker pull wurstmeister/zookeeper [root@centos-l...

Kafka如何保证消息不丢失不重复

首先需要思考下边几个问题: 消息丢失是什么造成的,从生产端和消费端两个角度来考虑 消息重复是什么造成的,从生产端和消费端两个角度来考虑 如何保证消息有序 如果保证消息不重不漏,损失的是什么 大概总结下 消费端重复消费:建立去重表 消费端丢失数据...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。