当前位置:首页 > 服务端 > kafka原理和实践(四)spring-kafka消费者源码

kafka原理和实践(四)spring-kafka消费者源码

2022年09月16日 18:19:03服务端7

系列目录

kafka原理和实践(一)原理:10分钟入门

kafka原理和实践(二)spring-kafka简单实践

kafka原理和实践(三)spring-kafka生产者源码

kafka原理和实践(四)spring-kafka消费者源码

kafka原理和实践(五)spring-kafka配置详解

kafka原理和实践(六)总结升华

 

 

==============正文分割线=====================

一、kafkaConsumer消费者模型

kafka原理和实践(四)spring-kafka消费者源码 _ JavaClub全栈架构师技术笔记

如上图所示,spring-kafka消费者模型主要流程:

1.容器启动,轮询执行消费。

2.kafkaConsumer拉取消息流程:

1)Fetcher请求获取器获取请求并存储在unset中

2)ConsumerNetworkClient网络客户端执行poll(),调用NetWlrikClient的send()方法从unset中获取ClientRequest请求转成RequestSend最终塞进Selector的KafkaChannel通道中,Seletcor.send()从kafka集群拉取待消费数据ConsumerRecords

3. 消费者监听器MessageListener.onMessage()执行用户自定义的实际消费业务逻辑。

一、kafkaConsumer构造

  1 @SuppressWarnings("unchecked")
  2     private KafkaConsumer(ConsumerConfig config,
  3                           Deserializer<K> keyDeserializer,
  4                           Deserializer<V> valueDeserializer) {
  5         try {
  6             log.debug("Starting the Kafka consumer");
  7             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
  8             int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
  9             int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
 10             if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
 11                 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
 12             this.time = new SystemTime();
 13 
 14             String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
 15             if (clientId.length() <= 0)
 16                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
 17             this.clientId = clientId;
 18             Map<String, String> metricsTags = new LinkedHashMap<>();
 19             metricsTags.put("client-id", clientId);
 20             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
 21                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
 22                     .tags(metricsTags);
 23             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
 24                     MetricsReporter.class);
 25             reporters.add(new JmxReporter(JMX_PREFIX));
 26             this.metrics = new Metrics(metricConfig, reporters, time);
 27             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 28 
 29             // load interceptors and make sure they get clientId
 30             Map<String, Object> userProvidedConfigs = config.originals();
 31             userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
 32             List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
 33                     ConsumerInterceptor.class);
 34             this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
 35             if (keyDeserializer == null) {
 36                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 37                         Deserializer.class);
 38                 this.keyDeserializer.configure(config.originals(), true);
 39             } else {
 40                 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
 41                 this.keyDeserializer = keyDeserializer;
 42             }
 43             if (valueDeserializer == null) {
 44                 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 45                         Deserializer.class);
 46                 this.valueDeserializer.configure(config.originals(), false);
 47             } else {
 48                 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
 49                 this.valueDeserializer = valueDeserializer;
 50             }
 51             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
 52             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
 53             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
 54             this.metadata.update(Cluster.bootstrap(addresses), 0);
 55             String metricGrpPrefix = "consumer";
 56             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
 57             NetworkClient netClient = new NetworkClient(
 58                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
 59                     this.metadata,
 60                     clientId,
 61                     100, // a fixed large enough value will suffice
 62                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
 63                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
 64                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
 65                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
 66             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
 67                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
 68             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
 69             this.subscriptions = new SubscriptionState(offsetResetStrategy);
 70             List<PartitionAssignor> assignors = config.getConfiguredInstances(
 71                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 72                     PartitionAssignor.class);
 73             this.coordinator = new ConsumerCoordinator(this.client,
 74                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
 75                     config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
 76                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
 77                     config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
 78                     assignors,
 79                     this.metadata,
 80                     this.subscriptions,
 81                     metrics,
 82                     metricGrpPrefix,
 83                     this.time,
 84                     retryBackoffMs,
 85                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
 86                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
 87                     config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
 88                     this.interceptors,
 89                     config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
 90             this.fetcher = new Fetcher<>(this.client,
 91                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
 92                     config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
 93                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
 94                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
 95                     config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
 96                     config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
 97                     this.keyDeserializer,
 98                     this.valueDeserializer,
 99                     this.metadata,
100                     this.subscriptions,
101                     metrics,
102                     metricGrpPrefix,
103                     this.time,
104                     this.retryBackoffMs);
105 
106             config.logUnused();
107             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
108 
109             log.debug("Kafka consumer created");
110         } catch (Throwable t) {
111             // call close methods if internal objects are already constructed
112             // this is to prevent resource leak. see KAFKA-2121
113             close(true);
114             // now propagate the exception
115             throw new KafkaException("Failed to construct kafka consumer", t);
116         }
117     }

 从KafkaConsumer构造函数来看,核心组件有:

1.Metadata:封装了元数据的一些逻辑的类。元数据仅保留一个主题的子集,随着时间的推移可以添加。当我们请求一个主题的元数据时,我们没有任何元数据会触发元数据更新。如果对元数据启用了主题过期,那么在更新之后,在过期时间间隔内未使用的任何主题都将从元数据刷新集中删除。

2.ConsumerNetworkClient:高等级消费者访问网络层,为请求Future任务提供基本支持。这个类是线程安全的,但是不提供响应回调的同步。这保证在调用它们时不会持有锁。

3.SubscriptionState:订阅的TopicPartition的offset状态维护

4.ConsumerCoordinator:消费者的协调者,负责partitiion的分配,reblance

5.Fetcher:从brokers上按照配置获取消息。

二、消费者容器启动流程

kafka消费者有两种常见的实现方式:

1.xml配置文件

2.基于注解实现

其实,不管哪种方式,本质只是生成Spring Bean的方式不同而已。我们就以xml的实现方式来追踪源码。

基于xml的总体配置如下:

 1 <!-- 1.定义consumer的参数 -->
 2     <bean id="consumerProperties" class="java.util.HashMap">
 3         <constructor-arg>
 4 <map>  5 <entry key="bootstrap.servers" value="${bootstrap.servers}" />  6 <entry key="group.id" value="${group.id}" />  7 <entry key="enable.auto.commit" value="${enable.auto.commit}" />  8 <entry key="session.timeout.ms" value="${session.timeout.ms}" />  9 <entry key="key.deserializer" 10 value="org.apache.kafka.common.serialization.StringDeserializer" /> 11 <entry key="value.deserializer" 12 value="org.apache.kafka.common.serialization.StringDeserializer" /> 13 </map> 14 </constructor-arg> 15 </bean> 16 17 <!-- 2.创建consumerFactory bean --> 18 <bean id="consumerFactory" 19 class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" > 20 <constructor-arg> 21 <ref bean="consumerProperties" /> 22 </constructor-arg> 23 </bean> 24 25 <!-- 3.定义消费实现类 --> 26 <bean id="kafkaConsumerService" class="xxx.service.impl.KafkaConsumerSerivceImpl" /> 27 28 <!-- 4.消费者容器配置信息 --> 29 <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> 30 <!-- topic --> 31 <constructor-arg name="topics"> 32 <list> 33 <value>${kafka.consumer.topic.credit.for.lease}</value> 34 <value>${loan.application.feedback.topic}</value> 35 <value>${templar.agreement.feedback.topic}</value> 36 <value>${templar.aggrement.active.feedback.topic}</value> 37 <value>${templar.aggrement.agreementRepaid.topic}</value> 38 <value>${templar.aggrement.agreementWithhold.topic}</value> 39 <value>${templar.aggrement.agreementRepayRemind.topic}</value> 40 </list> 41 </constructor-arg> 42 <property name="messageListener" ref="kafkaConsumerService" /> 43 </bean> 44 <!-- 5.消费者并发消息监听容器,执行doStart()方法 --> 45 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" > 46 <constructor-arg ref="consumerFactory" /> 47 <constructor-arg ref="containerProperties" /> 48 <property name="concurrency" value="${concurrency}" /> 49 </bean>

 分为5个步骤:

 2.1.定义消费参数bean

consumerProperties ,就是个map<key,value>

2.2.创建consumerFactory bean

DefaultKafkaConsumerFactory 实现了ConsumerFactory接口,提供创建消费者判断是否自动提交2个方法。通过consumerProperties作为参数构造。
1 public interface ConsumerFactory<K, V> {
2 
3     Consumer<K, V> createConsumer();
4 
5     boolean isAutoCommit();
6 
7 
8 }

2.3.定义消费实现类

自定义一个类实现MessageListener接口,接口设计如下:

kafka原理和实践(四)spring-kafka消费者源码 _ JavaClub全栈架构师技术笔记

实现onMessage方法,去消费接收到的消息。两种方案:

1)MessageListener 消费完消息后自动提交offset(enable.auto.commit=true时),可提高效率,存在消费失败但移动了偏移量的风险。

2)AcknowledgingMessageListener 消费完消息后手动提交offset(enable.auto.commit=false时)效率降低,无消费失败但移动偏移量的风险。

2.4.监听容器配置信息

ContainerProperties:包含了一个监听容器的运行时配置信息,主要定义了监听的主题、分区、初始化偏移量,还有消息监听器。
  1 public class ContainerProperties {
  2 
  3     private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
  4 
  5     private static final int DEFAULT_QUEUE_DEPTH = 1;
  6 
  7     private static final int DEFAULT_PAUSE_AFTER = 10000;
  8 
  9     /**
 10      * Topic names.监听的主题字符串数组
 11      */
 12     private final String[] topics;
 13 
 14     /**
 15      * Topic pattern.监听的主题模板
 16      */
 17     private final Pattern topicPattern;
 18 
 19     /**
 20      * Topics/partitions/initial offsets.
 21      */
 22     private final TopicPartitionInitialOffset[] topicPartitions;
 23 
 24     /**
 25      * 确认模式(自动确认属性为false时使用)
 26      * <ul>
 27      * <li>1.RECORD逐条确认: 每条消息被发送给监听者后确认</li>
 28      * <li>2.BATCH批量确认: 当批量消息记录被消费者接收到并传送给监听器时确认</li>
 30      * <li>3.TIME超时确认:当超过设置的超时时间毫秒数时确认(should be greater than
 31      * {@code #setPollTimeout(long) pollTimeout}.</li>
 32      * <li>4.COUNT计数确认: 当接收到指定数量之后确认</li>
 33      * <li>5.MANUAL手动确认:由监听器负责确认(AcknowledgingMessageListener</ul>
 36      */
 37     private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
 38 
 39     /**
 40      * The number of outstanding record count after which offsets should be
 41      * committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being
 42      * used.
 43      */
 44     private int ackCount;
 45 
 46     /**
 47      * The time (ms) after which outstanding offsets should be committed when
 48      * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
 49      * larger than
 50      */
 51     private long ackTime;
 52 
 53     /**
 54      * 消息监听器,必须是 MessageListener或者AcknowledgingMessageListener两者中的一个 55      * 56      */
 57     private Object messageListener;
 58 
 59     /**
 60      * The max time to block in the consumer waiting for records.
 61      */
 62     private volatile long pollTimeout = 1000;
 63 
 64     /**
 65      * 线程执行器:轮询消费者
 66      */
 67     private AsyncListenableTaskExecutor consumerTaskExecutor;
 68 
 69     /**
 70      * 线程执行器:调用监听器
 71      */
 72     private AsyncListenableTaskExecutor listenerTaskExecutor;
 73 
 74     /**
 75      * 错误回调,当监听器抛出异常时
 76      */
 77     private GenericErrorHandler<?> errorHandler;
 78 
 79     /**
 80      * When using Kafka group management and {@link #setPauseEnabled(boolean)} is
 81      * true, the delay after which the consumer should be paused. Default 10000.
 82      */
 83     private long pauseAfter = DEFAULT_PAUSE_AFTER;
 84 
 85     /**
 86      * When true, avoids rebalancing when this consumer is slow or throws a
 87      * qualifying exception - pauses the consumer. Default: true.
 88      * @see #pauseAfter
 89      */
 90     private boolean pauseEnabled = true;
 91 
 92     /**
 93      * Set the queue depth for handoffs from the consumer thread to the listener
 94      * thread. Default 1 (up to 2 in process).
 95      */
 96     private int queueDepth = DEFAULT_QUEUE_DEPTH;
 97 
 98     /**
 99      * 停止容器超时时间    */
103     private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
104 
105     /**
106      * 用户定义的消费者再平衡监听器实现类     */
108     private ConsumerRebalanceListener consumerRebalanceListener;
109 
110     /**
111      * 提交回调,默认记录日志。      */
114     private OffsetCommitCallback commitCallback;
115 
116     /**
117      * Whether or not to call consumer.commitSync() or commitAsync() when the
118      * container is responsible for commits. Default true. See
119      * https://github.com/spring-projects/spring-kafka/issues/62 At the time of
120      * writing, async commits are not entirely reliable.
121      */
122     private boolean syncCommits = true;
123 
124     private boolean ackOnError = true;
125 
126     private Long idleEventInterval;
127 
128     public ContainerProperties(String... topics) {
129         Assert.notEmpty(topics, "An array of topicPartitions must be provided");
130         this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
131         this.topicPattern = null;
132         this.topicPartitions = null;
133     }
134 
135     public ContainerProperties(Pattern topicPattern) {
136         this.topics = null;
137         this.topicPattern = topicPattern;
138         this.topicPartitions = null;
139     }
140 
141     public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
142         this.topics = null;
143         this.topicPattern = null;
144         Assert.notEmpty(topicPartitions, "An array of topicPartitions must be provided");
145         this.topicPartitions = new LinkedHashSet<>(Arrays.asList(topicPartitions))
146                 .toArray(new TopicPartitionInitialOffset[topicPartitions.length]);
147     }
148 ...省略各种set、get
149 
150 }

 

2.5.启动并发消息监听容器

核心类ConcurrentMessageListenerContainer,继承自抽象类AbstractMessageListenerContainer,类图如下:

kafka原理和实践(四)spring-kafka消费者源码 _ JavaClub全栈架构师技术笔记

 

看上图可知AbstractMessageListenerContainer有2个实现类分别对应单线程和多线程,建议采用多线程消费。下面分析一下主要ConcurrentMessageListenerContainer类,注意2个方法:

1.构造函数,入参:消费者工厂ConsumerFactory+容器配置ContainerProperties

2.doStart():核心方法KafkaMessageListenerContainer的start()方法。源码如下:

  1 public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
  2 
  3     private final ConsumerFactory<K, V> consumerFactory;
  4 
  5     private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
  6 
  7     private int concurrency = 1;
  8 
  9     /**
 10      * Construct an instance with the supplied configuration properties.
 11      * The topic partitions are distributed evenly across the delegate
 12      * {@link KafkaMessageListenerContainer}s.
 13      * @param consumerFactory the consumer factory.
 14      * @param containerProperties the container properties.
 15      */
 16     public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
 17             ContainerProperties containerProperties) {
 18         super(containerProperties);
 19         Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
 20         this.consumerFactory = consumerFactory;
 21     }
 22 
 23     public int getConcurrency() {
 24         return this.concurrency;
 25     }
 26 
 27     /**
 28      * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
 29      * Messages from within the same partition will be processed sequentially.
 30      * @param concurrency the concurrency.
 31      */
 32     public void setConcurrency(int concurrency) {
 33         Assert.isTrue(concurrency > 0, "concurrency must be greater than 0");
 34         this.concurrency = concurrency;
 35     }
 36 
 37     /**
 38      * Return the list of {@link KafkaMessageListenerContainer}s created by
 39      * this container.
 40      * @return the list of {@link KafkaMessageListenerContainer}s created by
 41      * this container.
 42      */
 43     public List<KafkaMessageListenerContainer<K, V>> getContainers() {
 44         return Collections.unmodifiableList(this.containers);
 45     }
 46 
 47     /*
 48      * Under lifecycle lock.
 49      */
 50     @Override
 51     protected void doStart() {
 52         if (!isRunning()) {
 53             ContainerProperties containerProperties = getContainerProperties();
 54             TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
 55             if (topicPartitions != null//校验并发数>分区数,报错。
 56                     && this.concurrency > topicPartitions.length) {
 57                 this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
 58                         + "equal to the number of partitions; reduced from " + this.concurrency + " to "
 59                         + topicPartitions.length);
 60                 this.concurrency = topicPartitions.length;//并发数最大只能=分区数
 61             }
 62             setRunning(true);
 63             //遍历创建监听器容器
 64             for (int i = 0; i < this.concurrency; i++) {
 65                 KafkaMessageListenerContainer<K, V> container;
 66                 if (topicPartitions == null) {
 67                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
 68                 }
 69                 else {
 70                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
 71                             partitionSubset(containerProperties, i));
 72                 }
 73                 if (getBeanName() != null) {
 74                     container.setBeanName(getBeanName() + "-" + i);
 75                 }
 76                 if (getApplicationEventPublisher() != null) {
 77                     container.setApplicationEventPublisher(getApplicationEventPublisher());
 78                 }
 79                 container.setClientIdSuffix("-" + i);
 80                 container.start();//核心方法,启动容器
 81                 this.containers.add(container);
 82             }
 83         }
 84     }146 ...省略
147 }

 继续追踪,调用AbstractMessageListenerContainer的doStart(),值得注意的是start()和stop方法加了同一把锁,用于锁住生命周期。

 1 private final Object lifecycleMonitor = new Object();
 2 
 3 @Override
 4     public final void start() {
 5         synchronized (this.lifecycleMonitor) {
 6             Assert.isTrue(
 7                     this.containerProperties.getMessageListener() instanceof KafkaDataListener,
 8                     "A " + KafkaDataListener.class.getName() + " implementation must be provided");
 9             doStart();
10         }
11     }
12 
13     protected abstract void doStart();

最终调用的是KafkaMessageListenerContainer的doStart()

 1 @Override
 2     protected void doStart() {
 3         if (isRunning()) {
 4             return;
 5         }
 6         ContainerProperties containerProperties = getContainerProperties();
 7 
 8         if (!this.consumerFactory.isAutoCommit()) {
 9             AckMode ackMode = containerProperties.getAckMode();
10             if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
11                 Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
12             }
13             if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
14                     && containerProperties.getAckTime() == 0) {
15                 containerProperties.setAckTime(5000);
16             }
17         }
18 
19         Object messageListener = containerProperties.getMessageListener();
20         Assert.state(messageListener != null, "A MessageListener is required");
21         if (messageListener instanceof GenericAcknowledgingMessageListener) {
22             this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
23         }
24         else if (messageListener instanceof GenericMessageListener) {
25             this.listener = (GenericMessageListener<?>) messageListener;
26         }
27         else {
28             throw new IllegalStateException("messageListener must be 'MessageListener' "
29                     + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
30         }
31         if (containerProperties.getConsumerTaskExecutor() == null) {
32             SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
33                     (getBeanName() == null ? "" : getBeanName()) + "-C-");
34             containerProperties.setConsumerTaskExecutor(consumerExecutor);
35         }
36         if (containerProperties.getListenerTaskExecutor() == null) {
37             SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
38                     (getBeanName() == null ? "" : getBeanName()) + "-L-");
39             containerProperties.setListenerTaskExecutor(listenerExecutor);
40         }//1.构建 监听消费者
41         this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
42         setRunning(true);
      //2.异步提交 监听消费者任务,返回Future并赋值。
43 this.listenerConsumerFuture = containerProperties 44 .getConsumerTaskExecutor() 45 .submitListenable(this.listenerConsumer); 46 }

 

doStart主要包含2个操作:构建内部类ListenerConsumer提交 监听消费者任务,返回Future并赋值。

1.构建内部类ListenerConsumer

ListenerConsumer类图如下:

kafka原理和实践(四)spring-kafka消费者源码 _ JavaClub全栈架构师技术笔记

ListenerConsumer构造函数源码如下:

 1 @SuppressWarnings("unchecked")
 2         ListenerConsumer(GenericMessageListener<?> listener, GenericAcknowledgingMessageListener<?> ackListener) {
 3             Assert.state(!this.isAnyManualAck || !this.autoCommit,
 4                     "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
 5             @SuppressWarnings("deprecation")
 6             final Consumer<K, V> consumer =
 7                     KafkaMessageListenerContainer.this.consumerFactory instanceof
 8                                     org.springframework.kafka.core.ClientIdSuffixAware
 9                             ? ((org.springframework.kafka.core.ClientIdSuffixAware<K, V>) KafkaMessageListenerContainer
10                                     .this.consumerFactory)
11                                         .createConsumer(KafkaMessageListenerContainer.this.clientIdSuffix)
12                             : KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
13 
14             this.theListener = listener == null ? ackListener : listener;
15             ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
16 
17             if (KafkaMessageListenerContainer.this.topicPartitions == null) {
18                 if (this.containerProperties.getTopicPattern() != null) {
19                     consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
20                 }
21                 else {
22                     consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
23                 }
24             }
25             else {
26                 List<TopicPartitionInitialOffset> topicPartitions =
27                         Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
28                 this.definedPartitions = new HashMap<>(topicPartitions.size());
29                 for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
30                     this.definedPartitions.put(topicPartition.topicPartition(),
31                             new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent()));
32                 }
33                 consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
34             }
35             this.consumer = consumer;
36             GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler();
37             this.genericListener = listener; 
//1.
if (this.theListener instanceof BatchAcknowledgingMessageListener) { 38 this.listener = null; 39 this.batchListener = null; 40 this.acknowledgingMessageListener = null; 41 this.batchAcknowledgingMessageListener = (BatchAcknowledgingMessageListener<K, V>) this.theListener; 42 this.isBatchListener = true; 43 }//2. 44 else if (this.theListener instanceof AcknowledgingMessageListener) { 45 this.listener = null; 46 this.acknowledgingMessageListener = (AcknowledgingMessageListener<K, V>) this.theListener; 47 this.batchListener = null; 48 this.batchAcknowledgingMessageListener = null; 49 this.isBatchListener = false; 50 }//3. 51 else if (this.theListener instanceof BatchMessageListener) { 52 this.listener = null; 53 this.batchListener = (BatchMessageListener<K, V>) this.theListener; 54 this.acknowledgingMessageListener = null; 55 this.batchAcknowledgingMessageListener = null; 56 this.isBatchListener = true; 57 }//4. 58 else if (this.theListener instanceof MessageListener) { 59 this.listener = (MessageListener<K, V>) this.theListener; 60 this.batchListener = null; 61 this.acknowledgingMessageListener = null; 62 this.batchAcknowledgingMessageListener = null; 63 this.isBatchListener = false; 64 } 65 else { 66 throw new IllegalArgumentException("Listener must be one of 'MessageListener', " 67 + "'BatchMessageListener', 'AcknowledgingMessageListener', " 68 + "'BatchAcknowledgingMessageListener', not " + this.theListener.getClass().getName()); 69 } 70 if (this.isBatchListener) { 71 validateErrorHandler(true); 72 this.errorHandler = new LoggingErrorHandler(); 73 this.batchErrorHandler = errHandler == null ? new BatchLoggingErrorHandler() 74 : (BatchErrorHandler) errHandler; 75 } 76 else { 77 validateErrorHandler(false); 78 this.errorHandler = errHandler == null ? new LoggingErrorHandler() : (ErrorHandler) errHandler; 79 this.batchErrorHandler = new BatchLoggingErrorHandler(); 80 } 81 Assert.state(!this.isBatchListener || !this.isRecordAck, "Cannot use AckMode.RECORD with a batch listener"); 82 }

1.定义消费者订阅topic或者指定分区

2.设置监听器,支持4种:

  1)BatchAcknowledgingMessageListener批量需确认消息监听器

  2)AcknowledgingMessageListener需确认消息监听器

  3)BatchMessageListener批量消息监听器

  4)MessageListener消息监听器(用的最多,一次消费一条)

 

2.提交 监听消费者任务(ListenerConsumer),返回Future并赋值。

这里我们看一下任务Runnable接口的run方法,分两种情况

1.如果自定义了分区,没必要再平衡分配分区了,直接回调

2.未指定分区,进入自旋消费

 1 @Override
 2         public void run() {
 3             if (this.genericListener instanceof ConsumerSeekAware) {
 4                 ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
 5             }
 6             this.count = 0;
 7             this.last = System.currentTimeMillis();
 8             if (isRunning() && this.definedPartitions != null) {// 1.如果运行中且自定义了分区,没必要再平衡分配分区了,直接回调
 9                 initPartitionsIfNeeded();// 有需要就初始化分区
10                 // 回调
13                 if (!this.autoCommit) {
14                     startInvoker();
15                 }
16             }
17             long lastReceive = System.currentTimeMillis();
18             long lastAlertAt = lastReceive;
19             while (isRunning()) {//2.未指定分区,进入自旋消费
20                 try {
21                     if (!this.autoCommit) {
22                         processCommits();// 如果手动提交,处理提交
23                     }
24                     processSeeks();// 重新定位偏移量,下一次消费时使用
25                     if (this.logger.isTraceEnabled()) {
26                         this.logger.trace("Polling (paused=" + this.paused + ")...");
27                     }// 1)拉取消费记录
28                     ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
29                     if (records != null && this.logger.isDebugEnabled()) {
30                         this.logger.debug("Received: " + records.count() + " records");
31                     }
32                     if (records != null && records.count() > 0) {
33                         if (this.containerProperties.getIdleEventInterval() != null) {
34                             lastReceive = System.currentTimeMillis();
35                         }// 2)如果设置了自动提交,直接在当前线程执行
39                         if (this.autoCommit) {
40                             invokeListener(records);
41                         }
42                         else {// 3)否则发送消息进缓存队列
43                             if (sendToListener(records)) {
44                                 if (this.assignedPartitions != null) {
45                                     // avoid group management rebalance due to a slow
46                                     // consumer
47                                     this.consumer.pause(this.assignedPartitions);
48                                     this.paused = true;
49                                     this.unsent = records;
50                                 }
51                             }
52                         }
53                     }
54                     else {
55                         if (this.containerProperties.getIdleEventInterval() != null) {
56                             long now = System.currentTimeMillis();
57                             if (now > lastReceive + this.containerProperties.getIdleEventInterval()
58                                     && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
59                                 publishIdleContainerEvent(now - lastReceive);
60                                 lastAlertAt = now;
61                                 if (this.genericListener instanceof ConsumerSeekAware) {
62                                     seekPartitions(getAssignedPartitions(), true);
63                                 }
64                             }
65                         }
66                     }
67                     this.unsent = checkPause(this.unsent);
68                 }
69                 catch (WakeupException e) {
70                     this.unsent = checkPause(this.unsent);
71                 }
72                 catch (Exception e) {
73                     if (this.containerProperties.getGenericErrorHandler() != null) {
74                         this.containerProperties.getGenericErrorHandler().handle(e, null);
75                     }
76                     else {
77                         this.logger.error("Container exception", e);
78                     }
79                 }
80             }
81             if (this.listenerInvokerFuture != null) {
82                 stopInvoker();
83                 commitManualAcks();
84             }
85             try {
86                 this.consumer.unsubscribe();
87             }
88             catch (WakeupException e) {
89                 // No-op. Continue process
90             }
91             this.consumer.close();
92             if (this.logger.isInfoEnabled()) {
93                 this.logger.info("Consumer stopped");
94             }
95         }

1.如果用户自定义了分区且非自动提交,那么开启异步线程执行ListenerInvoker任务,源码如下:

1 private void startInvoker() {
2             ListenerConsumer.this.invoker = new ListenerInvoker();
3             ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
4                     .submit(ListenerConsumer.this.invoker);
5         }

执行ListenerInvoker的run方法,实际上就执行一遍,因为CountDownLatch初始化为1

 1 private final class ListenerInvoker implements SchedulingAwareRunnable {
 2 
 3             private final CountDownLatch exitLatch = new CountDownLatch(1);
 4 
 5             private volatile boolean active = true;
 6 
 7             private volatile Thread executingThread;
 8 
 9             ListenerInvoker() {
10                 super();
11             }
12 
13             @Override
14             public void run() {
15                 Assert.isTrue(this.active, "This instance is not active anymore");
16                 if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
17                     ((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);
18                 }
19                 try {
20                     this.executingThread = Thread.currentThread();
21                     while (this.active) {
22                         try {// 从阻塞队列LinkedBlockingQueue recordsToProcess中拉取 待消费记录
23                             ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
24                                     TimeUnit.SECONDS);
25                             if (this.active) {
26                                 if (records != null) {
27                                     invokeListener(records);// 消费
28                                 }
29                                 else {
30                                     if (ListenerConsumer.this.logger.isTraceEnabled()) {
31                                         ListenerConsumer.this.logger.trace("No records to process");
32                                     }
33                                 }
34                             }
35                         }
36                         catch (InterruptedException e) {
37                             if (!this.active) {
38                                 Thread.currentThread().interrupt();
39                             }
40                             else {
41                                 ListenerConsumer.this.logger.debug("Interrupt ignored");
42                             }
43                         }
44                     }
45                 }
46                 finally {
47                     this.active = false;
48                     this.exitLatch.countDown();
49                 }
50             }
51 
52             @Override
53             public boolean isLongLived() {
54                 return true;
55             }
581         }
1 private void invokeListener(final ConsumerRecords<K, V> records) {
2             if (this.isBatchListener) {
3                 invokeBatchListener(records);
4             }
5             else {
6                 invokeRecordListener(records);
7             }
8         }

如上图,从阻塞队列中取得待消费记录,用迭代器iterator消费,根据自定义消费类型,用不同listener来执行onMessage方法(用户自定义MessageListener接口的onMessage方法,实现用户自己的消费业务逻辑

 1 private void invokeRecordListener(final ConsumerRecords<K, V> records) {
 2             Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
 3             while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) {
 4                 final ConsumerRecord<K, V> record = iterator.next();
 5                 if (this.logger.isTraceEnabled()) {
 6                     this.logger.trace("Processing " + record);
 7                 }
 8                 try {
 9                     if (this.acknowledgingMessageListener != null) {
10                         this.acknowledgingMessageListener.onMessage(record,// 终极核心方法,用户自定义的MessageListener接口的onMessage方法
11                                 this.isAnyManualAck
12                                         ? new ConsumerAcknowledgment(record, this.isManualImmediateAck)
13                                         : null);
14                     }
15                     else {
16                         this.listener.onMessage(record);// 终极核心方法,用户自定义的MessageListener接口的onMessage方法
17                     }
18                     if (!this.isAnyManualAck && !this.autoCommit) {
19                         this.acks.add(record);
20                     }
21                 }
22                 catch (Exception e) {
23                     if (this.containerProperties.isAckOnError() && !this.autoCommit) {
24                         this.acks.add(record);
25                     }
26                     try {
27                         this.errorHandler.handle(e, record);
28                     }
29                     catch (Exception ee) {
30                         this.logger.error("Error handler threw an exception", ee);
31                     }
32                     catch (Error er) { //NOSONAR
33                         this.logger.error("Error handler threw an error", er);
34                         throw er;
35                     }
36                 }
37             }
38         }

2.未指定分区,进入自旋

// 1)拉取消费记录
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
 2)如果设置了自动提交,直接在当前线程执行
invokeListener(records);
// 3)否则发送消息进缓存队列
sendToListener(records)

1)在每个轮询中,消费者将尝试使用最后一个被使用的偏移量作为起始偏移量,并按顺序提取。最后一个被消费的偏移量可以通过 seek(TopicPartition,long)或自动设置为最后一个被订阅的分区列表的偏移量获得。

 1 @Override
 2     public ConsumerRecords<K, V> poll(long timeout) {
 3         acquire();
 4         try {
 5             if (timeout < 0)
 6                 throw new IllegalArgumentException("Timeout must not be negative");
 7 
 8             if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
 9                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
10 
11             // poll for new data until the timeout expires
12             long start = time.milliseconds();
13             long remaining = timeout;
14             do {
15                 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
16                 if (!records.isEmpty()) {
23                     fetcher.sendFetches();// 在返回所获取的记录之前,我们可以发送下一轮的fetches并避免阻塞等待它们的响应,以便在用户处理获取的记录时进行流水线操作。
24                     client.pollNoWakeup();//由于已经更新了所使用的位置,所以我们不允许在返回所获取的记录之前触发wakeups或任何其他错误。
25 
26                     if (this.interceptors == null)
27                         return new ConsumerRecords<>(records);
28                     else// 如果存在消费者拦截器执行拦截
29                         return this.interceptors.onConsume(new ConsumerRecords<>(records));
30                 }
31 
32                 long elapsed = time.milliseconds() - start;
33                 remaining = timeout - elapsed;
34             } while (remaining > 0);
35 
36             return ConsumerRecords.empty();
37         } finally {
38             release();
39         }
40     }

pollOnce:

 1 private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
 2         coordinator.poll(time.milliseconds());
 3 
 4         // 遍历所有的TopicPartition,如果有未知偏移量(分区的),那么更新。涉及coordinator刷新已提交分区偏移量+fetcher更新获取位置
 6         if (!subscriptions.hasAllFetchPositions())
 7             updateFetchPositions(this.subscriptions.missingFetchPositions());
 8 
 9         // 返回已获取到的记录
10         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
11         if (!records.isEmpty())
12             return records;
13 
14         // 发送fetch请求
15         fetcher.sendFetches();
16 
17         long now = time.milliseconds();
18         long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
19         // 执行IO,拉取数据
20         client.poll(pollTimeout, now, new PollCondition() {
21             @Override
22             public boolean shouldBlock() {
23                 // since a fetch might be completed by the background thread, we need this poll condition
24                 // to ensure that we do not block unnecessarily in poll()
25                 return !fetcher.hasCompletedFetches();
26             }
27         });
31         if (coordinator.needRejoin())
32             return Collections.emptyMap();
33 
34         return fetcher.fetchedRecords();
35     }

 好吧,再往下涉及到通信IO层了,这里不再多说。将来补全了kafka通信协议相关文章后再加上飞机票。

2)invokeListener和分支1一样最终调用的是用户自定义的MessageListener接口的onMessage方法,不再重复。

3) sendToListener,这里塞进缓存队列LinkedBlockingQueue<ConsumerRecords<K, V>> recordsToProcess,塞进队列后,何时再消费?ListenerInvoker的run方法执行了recordsToProcess.poll进行了消费,

 

来源链接:https://www.cnblogs.com/dennyzhangdd/p/7759876.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/42079.html

标签: Kafka
分享给朋友:

“kafka原理和实践(四)spring-kafka消费者源码” 的相关文章

kafka消息中间件-快速学习

为什么需要消息队列   周末无聊刷着手机,某宝网APP突然蹦出来一条消息“为了回馈老客户,女朋友买一送一,活动仅限今天!”。买一送一还有这种好事,那我可不能错过!忍不住立马点了去。于是选了两个最新款,下单、支付一气呵成!满足的躺在床上,想着马上有女朋友了,竟然幸福的失眠了…...

kafka集群搭建

kafka集群搭建

本文将记录使用kafka镜像,分别在两种场景下搭建3节点集群:1.在一台机器上使用容器方式安装kafka集群;2.在三台机器上使用容器方式安装kafka集群。 此次使用的是wurstmeister的,下载量是比较大的。使用下面命令下载: docker pull wur...

kafka消息长度限制

更改为10M 客户端代码增加:max_request_size=10485760, 服务端配置:replica.fetch.max.bytes=10485760,message.max.bytes=10485760...

【kafka】安装部署kafka集群(kafka版本:kafka_2.12-2.3.0)

3.2.1 下载kafka并安装kafka_2.12-2.3.0.tgz tar -zxvf kafka_2.12-2.3.0.tgz 3.2.2 配置kafka集群 在config/server.properties中修改参数: [had...

kafka-server-stop.sh关闭Kafka失败

Kafka brokers need to finish the shutdown process before the zookeepers do. So start the zookeepers, then the kafka brokers wil...

Kafka 安装和简单使用

Kafka 安装和简单使用

文章目录 Kafka 安装和简单使用 kafka下载地址 windows 系统...

kafka的基本概念和工作流程分析

kafka的基本概念和工作流程分析

为什么需要消息队列   周末无聊刷着手机,某宝网APP突然蹦出来一条消息“为了回馈老客户,女朋友买一送一,活动仅限今天!”。买一送一还有这种好事,那我可不能错过!忍不住立马点了去。于是选了两个最新款,下单、支付一气呵成!满足的躺在床上,想着马上有女朋友了,竟然幸福的失眠了…...

Linux下Kafka下载与安装教程

Linux下Kafka下载与安装教程

原文链接:http://www.studyshare.cn/software/details/1176/0 一、预备环境 Kafka是java生态圈中的一员,运行在java虚拟机上,按Kafka官方说明,java环境推荐Java8;Kafka需要Zookeeper保存集群的...

Linux安装新版Kafka3.0

Linux安装新版Kafka3.0

最近开始玩Kafka了,想着装一下新版本的玩玩,然后网上找Kafka3.0的安装教程,发现安装Kafka3.0的倒是有,但是zookeeper还是单独安装的,这就不满足我的需求了,我就是单纯的想玩玩Kafka,我还得再去另外安装zookeepe...

kafka集群原理介绍

kafka集群原理介绍 @(博客文章)[kafka|大数据] 目录 kafka集群原理介绍 (一)基础理论 二、配置文件 三、错误处理 本系统文章共三篇,分别为 1、ka...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。