当前位置:首页 > 服务端 > Kafka的设计思想及基本使用

Kafka的设计思想及基本使用

2022年09月17日 14:38:39服务端6

目录

 

1、Kafka的基本概念

1.1 Kafka基本概念

1.2 Kafka与其他MQ对比

2、Kafka的核心概念

Producers的概念

broker的概念:

Message组成

Consumers的概念

3、Kafka安装部署

3.1 下载解压

3.2 启动zookeeper

3.3 配置kafka相关属性

3.4 启动服务

3.5 单机测试

4、Kafka简单Java操作

4.1 引入依赖坐标

4.2 生产者类

4.3 消费者类

5、工作流程分析

5.1 发送数据

5.2 保存数据

5.3 消费数据

5.4 总结

6、核心组件

6.1 发送类型

6.2 序列化器

6.3 分区器

6.4 拦截器

6.5 消息路由策略


1、Kafka的基本概念

1.1 Kafka基本概念

由于对JMS日常管理的过度开支和传统JMS可扩展性方面的局限,LinkedIn开发了Kafka以满足他们对实时数据流的监控以及对CPU、IO利用率等指标的高要求。在Linkedin开发Kafka之初,把关注重点集中在了这几个方面:

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consumer操作 ​ - 可扩展性:kafka集群支持热扩展 ​ - 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失 ​ - 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败) ​ - 高并发:支持数千个客户端同时读写

一个最基本的架构是生产者发布一个消息到Kafka的一个主题(topic),这个主题即是由扮演KafkaServer角色的broker提供,消费者订阅这个主题,然后从中获取消息,下面这个图可以更直观的描述这个场景:

Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

上图所示的架构分为四部分:Producer、Kafka Cluster、Consumer,Zookeeper。

Producer:Producer即生产者,消息的产生者,是消息的入口。   kafka cluster:     Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……     Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。(业务分类)     Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!(负载均衡)     Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。(高可用)     Message:每一条发送的消息主体。   Consumer:消费者,即消息的消费方,是消息的出口。     Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!   Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者大规模网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力

1.2 Kafka与其他MQ对比

Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

2、Kafka的核心概念

Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

  • Producers的概念

    消息和数据生成者,向Kafka的一个topic发布消息的过程叫做producers

    Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于round-robin方式或者通过其他的一些算法等;

    异步发送批量发送可以很有效的提高发送效率。kafka producer的异步发送模式允许进行批量发送,先将消息缓存到内存中,然后一次请求批量发送出去。

  • broker的概念:

    Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。

    Broker不保存订阅者的状态,由订阅者自己保存。

    无状态导致消息的删除成为难题(可能删除的消息正在被订阅),Kafka采用基于时间的SLA(服务保证),消息保存一定时间 (通常7天)后会删除。

    消费订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息

  • Message组成

    Message消息:是通信的基本单位,每个producer可以向一个topic发布消息。

    Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的,每个topic又可以分成不同的partition,每个partition储存一部分

    Partion中的每条Message包含以下三个属性:

offset long
MessageSize int32
data messages的具体内容
  • Consumers的概念

    消息和数据消费者,订阅topic并处理其发布的消息的过程叫做consumers.

    在kafka中,我们可以认为一个group是一个“订阅者”,一个topic中的每个partions只会被一个“订阅者”中的一个consumer

    消费,不过一个consumer可以消费多个partitions中的消息

PS:Kafka的设计原理决定,对于一个Topic,同一个Group不能多于Partition个数的Consumer同时消费,否则将意味着某些 Consumer无法得到消息

3、Kafka安装部署

3.1 下载解压

wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
cd kafka_2.12-2.5.0

3.2 启动zookeeper

Kafka的使用依赖zookeeper,可以使用已有的zookeeper服务,新版的kafka已内置了一个zookeeper环境,也可以直接使用,

我这里直接使用kafka自带的zookeeper:

#进入kafka解压目录
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

3.3 配置kafka相关属性

#vim config/server.properties
​
#实例ID,唯一标识,主要用于集群区分
broker.id=0   
#指定服务的端口
listeners=PLAINTEXT://192.168.223.128:9092  
#如果要提供外网访问 必须配置此项producer或consumer将在此端口建立连接
advertised.listeners=PLAINTEXT://192.168.223.128:9092
#日志目录
log.dirs=/var/log/tmp/kafka-logs 
#broker需要使用zookeeper保存meta数据
zookeeper.connect=localhost:2181
#topic在当前broker上的分片个数,每个主题创建的时候默认使用该分片数
#可以通过 bin/kafka-topics.sh --bootstrap-server 192.168.223.128:9092 --topic test4 --describe查看
num.partitions=1
#服务器名称
hostname=192.168.223.128

3.4 启动服务

./bin/kafka-server-start.sh config/server.properties

Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

3.5 单机测试

#创建topic主题
[root@ydt1 kafka_2.12-2.5.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.223.128:9092 --create --topic test
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic test
​
#生产者
[root@ydt1 kafka_2.12-2.5.0]# ./bin/kafka-console-producer.sh --bootstrap-server 192.168.223.128:9092 --topic test
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
>lmx
>ddddd
​
#消费者
[root@ydt1 kafka_2.12-2.5.0]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.223.128:9092 --topic test --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
lmx
ddddd
​

 

4、Kafka简单Java操作

4.1 引入依赖坐标

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.5.0</version>
        </dependency>
    </dependencies>

4.2 生产者类

package com.ydt.kafka;
​
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
​
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
​
public class Producer {
    public static String topic = "test4";//定义主题
​
    public static void main(String[] args) throws InterruptedException {
        Properties p = new Properties();
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.223.128:9092");//kafka地址,多个地址用逗号分割
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);//默认分区器
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(p);
​
        try {
            while (true) {
                String msg = "Hello," + new Random().nextInt(100);
                 //第二个参数为指定的分片key,相同的key会分配到相同的分片
                // (当然是需要通过计算的,计算规则可以查看DefaultPartitioner.partition()方法)
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
                kafkaProducer.send(record);
                System.out.println("消息发送成功:" + msg);
                Thread.sleep(1000);
            }
        } finally {
            kafkaProducer.close();
        }
​
    }
}

4.3 消费者类

复制两个,分别开启

package com.ydt.kafka;
​
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
​
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
​
public class Consumer1 {
    public static void main(String[] args) {
        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.223.128:9092");
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//自动提交,可能会出现重复消费问题
​
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
        kafkaConsumer.subscribe(Arrays.asList(Producer1.topic));// 订阅消息
​
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.of(100, ChronoUnit.MILLIS));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
                        record.topic(), record.offset(), record.value()));
            }
        }
    }
}

注意:主题涉及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者消费。出现分区小于消费者个数的时候,可以增加分区。

同时注意开放端口9092:

#开启端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent     ----其他端口照做
#重启防火墙
firewall-cmd --reload

 

5、工作流程分析

介绍了Kafka的基本概念和安装,完成了一个入门demo后,我们来分析一下Kafka的工作流程!Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候永远的找leader,不会直接将数据写入follower!

5.1 发送数据

发送消息原理图:

Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

这个只是大意义上的通过算法将数据路由到对应的partition,其实是先写入leader partition,那leader怎么处理的呢?写入的流程又是什么样的呢?我们看下图:

Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:

Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:    1、 方便扩展。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。    2、 提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

  熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:    1、 在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。**    2、 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。    3、 如果既没指定partition,又没有设置key,则会轮询选出一个partition。**

  保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为01all(-1)。    0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。    1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功,万一发送完后leader挂掉,数据也不可用,必须重启leader。    all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

  最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1,当然,你可以通过修改server.properties文件设置。

5.2 保存数据

Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

Partition 结构   前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

如上图,这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。具体可以查看server.properties配置的log目录

cd /var/log/kafka-logs/test-0
vim **********.log

Message结构 上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:   1、 offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!   2、 消息大小描述:消息大小占用4byte,用于描述消息的大小。   3、 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。

存储策略   无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?   1、 基于时间,默认配置是168小时(7天)。----log.retention.hours=168   2、 基于大小,默认配置是1073741824比特(1G)。 ----log.segment.bytes=1073741824   需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

5.3 消费数据

消息存储在log文件后,消费者就可以进行消费了。与生产消息相同的是,消费者在拉取消息的时候也是找leader去拉取。

  多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!是不是有点绕。我们看下图:

  Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

  图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就比不上只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议消费者组的consumer的数量与partition的数量一致!   在保存数据的小节里面,我们聊到了partition划分为多组segment,每个segment又包含.log、.index、.timeindex文件,存放的每条message包含offset、消息大小、消息体……我们多次提到segment和offset,查找消息的时候是怎么利用segment+offset配合查找的呢?假如现在需要查找一个offset为368801的message是什么样的过程呢?我们先看看下面的图:

Kafka的设计思想及基本使用 _ JavaClub全栈架构师技术笔记

  1、 先找到offset为368801的message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。   2、 打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。   3、 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。

  这套机制是建立在offset为有序的基础上,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的offset已经直接维护在kafka集群的__consumer_offsets这个topic中!

5.4 总结

1)、Producer 先从 ZooKeeper 中找到该 Partition 的 Leader。

2)、Producer将消息发送给该 Leader。

3)、Leader 将消息接入本地的 log,并通知 ISR (集群列表)的 Follower。

4)、ISR 中的 Follower 从 Leader 中 Pull 消息, 写入本地 log 后向 Leader 发送 Ack。

5)、Leader 收到所有 ISR 中的 Follower 的 Ack 后,增加 HW 并向 Producer 发送 Ack,表示消息写入成功。(acks=all的情况)

HW是High Watermak的缩写, 俗称高水位,它表示了一个特定消息的偏移量(offset),消费者只能拉取到这个offset之前的消息

6、核心组件

6.1 发送类型

包括同步发送和异步发送

同步发送:通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待Kafka响应:如果kafka正常响应,返回一个RecordMetaData对象,该对象存储消息的偏移量,分片,主题等信息;如果kafka发生错误,无法正常响应,就会抛出异常,我们可以进行异常处理。

Future<RecordMetadata> future = kafkaProducer.send(record);
                RecordMetadata recordMetadata = future.get();
                System.out.println(String.format("消息发送成功:topic:%s,partition:%s,offset:%s,消息:%s",
                        recordMetadata.topic(),recordMetadata.partition(), recordMetadata.offset(), record.value()));

异步发送:通过send()方法中CallBack回调,可以不需要等待kafka响应后再发送下一个消息,开发者可以通过判断异常对象是否为空来进行业务处理

 kafkaProducer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if(e == null){
                            System.out.println(String.format("消息发送成功:topic:%s,partition:%s,offset:%s,消息:%s",
                                    recordMetadata.topic(),recordMetadata.partition(), recordMetadata.offset(), record.value()));  
                        }else{
                            System.out.println("消息发送失败:" + e.getMessage());
                        }
                    }
                });

 

6.2 序列化器

因为我们的生产者需要将消息在网络中传播或者保存到磁盘上;而消费者需要接受消息或者将磁盘上的消息进行反序列化,所以kafka为我们提供了大量的序列化类,直接使用即可,当然如果你希望自定义序列化或者反序列化类,可以自定义一个类实现kafka提供的org.apache.kafka.common.serialization.Serializer<T>接口,实现其方法即可

import org.apache.kafka.common.serialization.Serializer;
​
public class MySerializer implements Serializer<String> {
    @Override
    public byte[] serialize(String s, String s2) {
        s2 = s2+ "My Serializer";
        return s2.getBytes();
    }
}
​
//使用
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MySerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MySerializer.class);

6.3 分区器

四种分区策略:
    第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去
    第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
    第三种分区策略:既没有给定分区号,也没有给定key值,直接轮循进行分区
    第四种分区策略:自定义分区

分区器是kafka为消息进行分区保存使用的一个计算规则,可以同样可以自定义(实现Partitioner接口),但是一般情况下使用kafka默认自带的分区器功能足矣,所以我们来看看默认分区器的源码:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster            cluster) {
        if (keyBytes == null) {//如果keyBytes为空(默认不设置)
            return this.stickyPartitionCache.partition(topic, cluster);
        } else {//keyBytes不为空
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();//拿到该主题的分片数量(其实就是创建主题时num.partitions配置数)
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;//murmur2算法计算key的哈希值和分区数量直接           取模,计算分片位置
        }
    }   
​
    public int partition(String topic, Cluster cluster) {
        Integer part = (Integer)this.indexCache.get(topic);//如果之前该主题当前有分区了,直接从缓存中取
        return part == null ? this.nextPartition(topic, cluster, -1) : part;
    }
​
    //获取下一次分片位置
    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);//拿到集群中主题的分区对象集合
        Integer oldPart = (Integer)this.indexCache.get(topic);//再次获取缓存的主题分区(有可能其他进程调用)
        Integer newPart = oldPart;
        if (oldPart != null && oldPart != prevPartition) {//如果当前主题分区有效(不等于上一分区)
            return (Integer)this.indexCache.get(topic); //缓存取
        } else {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//活跃的集群分区对象集合
            Integer random;
            if (availablePartitions.size() < 1) {//小于1个的时候
                random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();//随机整数与集群中主题分区集合总数取模
            } else if (availablePartitions.size() == 1) {//一个分片,那就没啥可以说的了
                newPart = ((PartitionInfo)availablePartitions.get(0)).partition();
            } else {
                while(newPart == null || newPart.equals(oldPart)) {//恒等式
                    random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = ((PartitionInfo)availablePartitions.get(random % availablePartitions.size())).partition();//根据随机整数和有效分区总数的取模
                }
            }
            //将分区位置存入缓存
            if (oldPart == null) {
                this.indexCache.putIfAbsent(topic, newPart);
            } else {
                this.indexCache.replace(topic, prevPartition, newPart);
            }
​
            return (Integer)this.indexCache.get(topic);
        }
    }

自定义分区器:

package com.ydt.kafka;
​
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
​
import java.util.Map;
​
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        System.out.println("自定义分区器");
        return 3;
    }
​
    @Override
    public void close() {
​
    }
​
    @Override
    public void configure(Map<String, ?> map) {
​
    }
}
​

 

6.4 拦截器

拦截器是在kafka中是一个相当新的功能,包括生产者拦截器和消费者拦截器,不过他们都只提供了接口,具体实现需要用户自定义:

生产者拦截器接口:ProducerInterceptor

package com.ydt.kafka;
​
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
​
import java.util.Map;
​
public class MyProducerInterceptor implements ProducerInterceptor<String,String> {
    @Override
    public ProducerRecord<String,String> onSend(ProducerRecord producerRecord) {
        String value = (String) producerRecord.value();
        if(value.indexOf("隔壁老王") != -1){
            value = value.replaceAll("隔壁老王","");//过滤掉不符合要求的
        }
        return new ProducerRecord(producerRecord.topic(),producerRecord.partition(),producerRecord.key()
                ,value);
    }
​
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
​
    }
​
    @Override
    public void close() {
​
    }
​
    @Override
    public void configure(Map<String, ?> map) {
​
    }
}
​

消费者过滤器:ConsumerInterceptor

package com.ydt.kafka;
​
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Record;
​
import java.util.*;
​
public class MyConsumerInterceptor implements ConsumerInterceptor {
    @Override
    public ConsumerRecords onConsume(ConsumerRecords consumerRecords) {
        Iterator iterator = consumerRecords.iterator();
        List<ConsumerRecord> cRecords = new ArrayList<>();
        if (iterator.hasNext()){
            ConsumerRecord record = (ConsumerRecord) iterator.next();
            String value = (String) record.value();
            if(value.indexOf("gebilaowang") != -1){
                value = value.replaceAll("gebilaowang","隔壁老王");
            }
            record = new ConsumerRecord(record.topic(),4,record.offset(),record.key(),value);
            cRecords.add(record);
        }
        Map<TopicPartition, List<ConsumerRecord>> records = new HashMap<>();
        //第一个参数TopicPartition真是无语,数据很有可能不是一个Topic,更不可能是同一个Partition
        records.put(null,cRecords);
        return new ConsumerRecords(records);
    }
​
    @Override
    public void close() {
​
    }
​
    @Override
    public void onCommit(Map map) {
​
    }
​
    @Override
    public void configure(Map<String, ?> map) {
​
    }
}
​

6.5 消息路由策略

在通过 API 方式发布消息时,生产者是以 Record 为消息进行发布的。

Record 中包含 Key 与 Value,Value 才是我们真正的消息本身,而 Key 用于路由消息所要存放的 Partition。

消息要写入到哪个 Partition 并不是随机的,而是有路由策略的:

  • 若指定了 Partition,则直接写入到指定的 Partition。

  • 若未指定 Partition 但指定了 Key,则通过对 Key 的 Hash 值与 Partition 数量取模,该取模。

  • 若 Partition 和 Key 都未指定,则使用轮询算法选出一个 Partition。

 

作者:八五年的湘哥
来源链接:https://blog.csdn.net/huxiang19851114/article/details/114155558

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/42705.html

标签: Kafka
分享给朋友:

“Kafka的设计思想及基本使用” 的相关文章

kafka消息中间件-快速学习

为什么需要消息队列   周末无聊刷着手机,某宝网APP突然蹦出来一条消息“为了回馈老客户,女朋友买一送一,活动仅限今天!”。买一送一还有这种好事,那我可不能错过!忍不住立马点了去。于是选了两个最新款,下单、支付一气呵成!满足的躺在床上,想着马上有女朋友了,竟然幸福的失眠了…...

kafka集群搭建

kafka集群搭建

本文将记录使用kafka镜像,分别在两种场景下搭建3节点集群:1.在一台机器上使用容器方式安装kafka集群;2.在三台机器上使用容器方式安装kafka集群。 此次使用的是wurstmeister的,下载量是比较大的。使用下面命令下载: docker pull wur...

kafka消息长度限制

更改为10M 客户端代码增加:max_request_size=10485760, 服务端配置:replica.fetch.max.bytes=10485760,message.max.bytes=10485760...

kafka-server-stop.sh关闭Kafka失败

Kafka brokers need to finish the shutdown process before the zookeepers do. So start the zookeepers, then the kafka brokers wil...

在CentOS 7上安装Kafka

简介 Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、S...

Kafka 安装和简单使用

Kafka 安装和简单使用

文章目录 Kafka 安装和简单使用 kafka下载地址 windows 系统...

kafka的基本概念和工作流程分析

kafka的基本概念和工作流程分析

为什么需要消息队列   周末无聊刷着手机,某宝网APP突然蹦出来一条消息“为了回馈老客户,女朋友买一送一,活动仅限今天!”。买一送一还有这种好事,那我可不能错过!忍不住立马点了去。于是选了两个最新款,下单、支付一气呵成!满足的躺在床上,想着马上有女朋友了,竟然幸福的失眠了…...

Linux下Kafka下载与安装教程

Linux下Kafka下载与安装教程

原文链接:http://www.studyshare.cn/software/details/1176/0 一、预备环境 Kafka是java生态圈中的一员,运行在java虚拟机上,按Kafka官方说明,java环境推荐Java8;Kafka需要Zookeeper保存集群的...

Linux安装新版Kafka3.0

Linux安装新版Kafka3.0

最近开始玩Kafka了,想着装一下新版本的玩玩,然后网上找Kafka3.0的安装教程,发现安装Kafka3.0的倒是有,但是zookeeper还是单独安装的,这就不满足我的需求了,我就是单纯的想玩玩Kafka,我还得再去另外安装zookeepe...

Kafka 快速入门(安装)

Kafka 快速入门(安装)

kafka学习目录:kafka目录 二、Kafka 快速入门 2.1、windows版安装 2.1.1、Quick Start 本次安装学习在Windows操作系统进行。(Linux版本的差别不大,运行脚本文件后缀从bat...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。