SpringBoot 版本:2.0.0.BUILD-SNAPSHOT
类 KafkaAutoConfiguration
主要功能:自动创建各种bean: KafkaTemplate, messageConverter,producerFactory,consumerFactory等。
@Configuration @ConditionalOnClass(KafkaTemplate.class) @EnableConfigurationProperties(KafkaProperties.class) @Import(KafkaAnnotationDrivenConfiguration.class) public class KafkaAutoConfiguration {
private final KafkaProperties properties;
private final RecordMessageConverter messageConverter;
public KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); }
@Bean @ConditionalOnMissingBean(KafkaTemplate.class) public KafkaTemplate<?, ?> kafkaTemplate( ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>( kafkaProducerFactory); if (this.messageConverter != null) { kafkaTemplate.setMessageConverter(this.messageConverter); } kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; }
@Bean @ConditionalOnMissingBean(ProducerListener.class) public ProducerListener<Object, Object> kafkaProducerListener() { return new LoggingProducerListener<>(); }
@Bean @ConditionalOnMissingBean(ConsumerFactory.class) public ConsumerFactory<?, ?> kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory<>( this.properties.buildConsumerProperties()); }
@Bean @ConditionalOnMissingBean(ProducerFactory.class) public ProducerFactory<?, ?> kafkaProducerFactory() { DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>( this.properties.buildProducerProperties()); String transactionIdPrefix = this.properties.getProducer() .getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); } return factory; }
@Bean @ConditionalOnProperty(name = “spring.kafka.producer.transaction-id-prefix”) @ConditionalOnMissingBean public KafkaTransactionManager<?, ?> kafkaTransactionManager( ProducerFactory<?, ?> producerFactory) { return new KafkaTransactionManager<>(producerFactory); }
@Bean @ConditionalOnProperty(name = “spring.kafka.jaas.enabled”) @ConditionalOnMissingBean public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); Jaas jaasProperties = this.properties.getJaas(); if (jaasProperties.getControlFlag() != null) { jaas.setControlFlag(jaasProperties.getControlFlag()); } if (jaasProperties.getLoginModule() != null) { jaas.setLoginModule(jaasProperties.getLoginModule()); } jaas.setOptions(jaasProperties.getOptions()); return jaas; }
@Bean @ConditionalOnMissingBean(KafkaAdmin.class) public KafkaAdmin kafkaAdmin() { KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties()); kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast()); return kafkaAdmin; }
}
类 KafkaAnnotationDrivenConfiguration 主要功能:自动创建bean :kafkaListenerContainerFactory。
/** * Configuration for Kafka annotation-driven support. * * @author Gary Russell * @author Eddú Meléndez * @since 1.5.0 */ @Configuration @ConditionalOnClass(EnableKafka.class) class KafkaAnnotationDrivenConfiguration {
private final KafkaProperties properties;
private final RecordMessageConverter messageConverter;
private final KafkaTemplate<Object, Object> kafkaTemplate;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter, ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); this.kafkaTemplate = kafkaTemplate.getIfUnique(); }
@Bean @ConditionalOnMissingBean public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); configurer.setKafkaProperties(this.properties); configurer.setMessageConverter(this.messageConverter); configurer.setReplyTemplate(this.kafkaTemplate); return configurer; }
@Bean @ConditionalOnMissingBean(name = “kafkaListenerContainerFactory”) public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); return factory; }
@EnableKafka @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) protected static class EnableKafkaConfiguration {
}
}
源码解析: (1) ConcurrentKafkaListenerContainerFactoryConfigurer 这个类是用来配置kafkaListenerContainerFactory。
public void configure( ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory, ConsumerFactory<Object, Object> consumerFactory) { listenerContainerFactory.setConsumerFactory(consumerFactory); if (this.messageConverter != null) { listenerContainerFactory.setMessageConverter(this.messageConverter); } if (this.replyTemplate != null) { listenerContainerFactory.setReplyTemplate(this.replyTemplate); } Listener container = this.properties.getListener(); ContainerProperties containerProperties = listenerContainerFactory .getContainerProperties(); if (container.getAckMode() != null) { containerProperties.setAckMode(container.getAckMode()); } if (container.getAckCount() != null) { containerProperties.setAckCount(container.getAckCount()); } if (container.getAckTime() != null) { containerProperties.setAckTime(container.getAckTime().toMillis()); } if (container.getPollTimeout() != null) { containerProperties.setPollTimeout(container.getPollTimeout().toMillis()); } if (container.getConcurrency() != null) { listenerContainerFactory.setConcurrency(container.getConcurrency()); } if (container.getType() == Listener.Type.BATCH) { listenerContainerFactory.setBatchListener(true); } }
(2)ConcurrentKafkaListenerContainerFactory 主要功能:依据给定的kafka监听目标endPoint 生成MessageListenerContainer的实例。该实例全权负责对给定的监听目标endPoint的所有操作:循环从目标中取数据 –> 处理数据:利用实例中依据配置生成messageListener 的onMessage(…)方法处理数据。该实例是可以设置并发的,即concurrency属性设为N,则生成N个Consumer监听线程从该监听目标中取数据/处理。 public class ConcurrentKafkaListenerContainerFactory<K, V> extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>, K, V> {
private Integer concurrency;
/** * Specify the container concurrency. * @param concurrency the number of consumers to create. * @see ConcurrentMessageListenerContainer#setConcurrency(int) */ public void setConcurrency(Integer concurrency) { this.concurrency = concurrency; }
@Override protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) { Collection<TopicPartitionInitialOffset> topicPartitions = endpoint.getTopicPartitions(); if (!topicPartitions.isEmpty()) { ContainerProperties properties = new ContainerProperties( topicPartitions.toArray(new TopicPartitionInitialOffset[topicPartitions.size()])); return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties); } else { Collection<String> topics = endpoint.getTopics(); if (!topics.isEmpty()) { ContainerProperties properties = new ContainerProperties(topics.toArray(new String[topics.size()])); return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties); } else { ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); return new ConcurrentMessageListenerContainer<K, V>(getConsumerFactory(), properties); } } }
@Override protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) { super.initializeContainer(instance); if (this.concurrency != null) { instance.setConcurrency(this.concurrency); } }
}
其父类
AbstractKafkaListenerContainerFactory 中的方法createListenerContainer(KafkaListenerEndpoint endpoint)是生成messageListenerContaner的入口:
@SuppressWarnings(“unchecked”) @Override public C createListenerContainer(KafkaListenerEndpoint endpoint) { C instance = createContainerInstance(endpoint);
if (this.autoStartup != null) { instance.setAutoStartup(this.autoStartup); } if (this.phase != null) { instance.setPhase(this.phase); } if (this.applicationEventPublisher != null) { instance.setApplicationEventPublisher(this.applicationEventPublisher); } if (endpoint.getId() != null) { instance.setBeanName(endpoint.getId()); }
if (endpoint instanceof AbstractKafkaListenerEndpoint) { AbstractKafkaListenerEndpoint<K, V> aklEndpoint = (AbstractKafkaListenerEndpoint<K, V>) endpoint; if (this.recordFilterStrategy != null) { aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy); } if (this.ackDiscarded != null) { aklEndpoint.setAckDiscarded(this.ackDiscarded); } if (this.retryTemplate != null) { aklEndpoint.setRetryTemplate(this.retryTemplate); } if (this.recoveryCallback != null) { aklEndpoint.setRecoveryCallback(this.recoveryCallback); } if (this.batchListener != null) { aklEndpoint.setBatchListener(this.batchListener); } if (this.replyTemplate != null) { aklEndpoint.setReplyTemplate(this.replyTemplate); } }
endpoint.setupListenerContainer(instance, this.messageConverter); initializeContainer(instance); instance.getContainerProperties().setGroupId(endpoint.getGroupId());
return instance; }
该入口中调用具体的containerFactory中的createContainerInstance(endPoint)方法生成messageListenerContainer实例,然后对该实例进行各种配置。 messsageListenerContainer实例的功能主要功能是启动一个线程循环去执行数据消费: 从指定的endPoint目标中取数据–> 交给messageListenerContainer中的messageListener的onMessage(…)方法处理。 而messageListener的生成是由目标endPoint生成的。后文会对endPoint作详细解释。
(3) AbstractMessageListenerContainer 主要功能:启动消费线程:consumer从目标中循环取数据/处理数据。其中处理数据是委托给container中的messageListener处理的。 该类有两个具体实现: KafkaMessagListenerContainer和ConcurrentMessageListenerContainer。 其中ConcurrentMessageListenerContainer 支持多线程执行并委托给KafkaMessageListenerContainer:即 生成N个KafkaMessagListenerContainer去执行。且为生成的每个KafkaMessageListenerContainer分配具体目标:分区。
ConcurrentMessageListenerContainer 的执行入口doStart():
@Override protected void doStart() { if (!isRunning()) { ContainerProperties containerProperties = getContainerProperties(); TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.logger.warn(“When specific partitions are provided, the concurrency must be less than or ” + “equal to the number of partitions; reduced from ” + this.concurrency + ” to ” + topicPartitions.length); this.concurrency = topicPartitions.length; } setRunning(true);
for (int i = 0; i < this.concurrency; i++) { KafkaMessageListenerContainer<K, V> container; if (topicPartitions == null) { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties); } else { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); } if (getBeanName() != null) { container.setBeanName(getBeanName() + “-” + i); } if (getApplicationEventPublisher() != null) { container.setApplicationEventPublisher(getApplicationEventPublisher()); } container.setClientIdSuffix(“-” + i); container.start(); this.containers.add(container); } } } 从中可以看出最终是生成KafkaMessageListenerContainer实例去处理数据的。
我们跟踪KafkaMessageListenerContainer的doStart()入口,看下具体是怎么实现的:
@Override protected void doStart() { if (isRunning()) { return; } ContainerProperties containerProperties = getContainerProperties(); if (!this.consumerFactory.isAutoCommit()) { AckMode ackMode = containerProperties.getAckMode(); if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) { Assert.state(containerProperties.getAckCount() > 0, “‘ackCount’ must be > 0”); } if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) { containerProperties.setAckTime(5000); } }
Object messageListener = containerProperties.getMessageListener(); Assert.state(messageListener != null, “A MessageListener is required”); if (containerProperties.getConsumerTaskExecutor() == null) { SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? “” : getBeanName()) + “-C-“); containerProperties.setConsumerTaskExecutor(consumerExecutor); } Assert.state(messageListener instanceof GenericMessageListener, “Listener must be a GenericListener”); this.listener = (GenericMessageListener<?>) messageListener; ListenerType listenerType = ListenerUtils.determineListenerType(this.listener); if (this.listener instanceof DelegatingMessageListener) { Object delegating = this.listener; while (delegating instanceof DelegatingMessageListener) { delegating = ((DelegatingMessageListener<?>) delegating).getDelegate(); } listenerType = ListenerUtils.determineListenerType(delegating); } this.listenerConsumer = new ListenerConsumer(this.listener, listenerType); setRunning(true); this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this.listenerConsumer); }
解读: (1)依据container中的messageListener会判断出该messgeListener的类型:
ListenerType listenerType = ListenerUtils.
determineListenerType
(
this
.
listener
)
;
(2)依据messageListener和listenerType 生成消费线程ListenerConsumer 放入线程池执行:
this
.
listenerConsumer
=
new
ListenerConsumer(
this
.
listener
,
listenerType)
;
解读ListenerConsumer线程: a. 生成ListenerConsumer线程时:会创建消费者并订阅topic/分区。并创建一个消费者状态监听线程:
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) { Assert.state(!this.isAnyManualAck || !this.autoCommit, “Consumer cannot be configured for auto commit for ackMode ” + this.containerProperties.getAckMode()); final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer( this.consumerGroupId, KafkaMessageListenerContainer.this.clientIdSuffix); this.consumer = consumer;
ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
if (KafkaMessageListenerContainer.this.topicPartitions == null) { if (this.containerProperties.getTopicPattern() != null) { consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener); } else { consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener); } } else { List<TopicPartitionInitialOffset> topicPartitions = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions); this.definedPartitions = new HashMap<>(topicPartitions.size()); for (TopicPartitionInitialOffset topicPartition : topicPartitions) { this.definedPartitions.put(topicPartition.topicPartition(), new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent())); } consumer.assign(new ArrayList<>(this.definedPartitions.keySet())); } GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler(); this.genericListener = listener; if (listener instanceof BatchMessageListener) { this.listener = null; this.batchListener = (BatchMessageListener<K, V>) listener; this.isBatchListener = true; } else if (listener instanceof MessageListener) { this.listener = (MessageListener<K, V>) listener; this.batchListener = null; this.isBatchListener = false; } else { throw new IllegalArgumentException(“Listener must be one of ‘MessageListener’, ” + “‘BatchMessageListener’, or the variants that are consumer aware and/or ” + “Acknowledging” + ” not ” + listener.getClass().getName()); } this.listenerType = listenerType; this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || listenerType.equals(ListenerType.CONSUMER_AWARE); if (this.isBatchListener) { validateErrorHandler(true); this.errorHandler = new LoggingErrorHandler(); this.batchErrorHandler = determineBatchErrorHandler(errHandler); } else { validateErrorHandler(false); this.errorHandler = determineErrorHandler(errHandler); this.batchErrorHandler = new BatchLoggingErrorHandler(); } Assert.state(!this.isBatchListener || !this.isRecordAck, “Cannot use AckMode.RECORD with a batch listener”); if (this.transactionManager != null) { this.transactionTemplate = new TransactionTemplate(this.transactionManager); Assert.state(!(this.errorHandler instanceof RemainingRecordsErrorHandler), “You cannot use a ‘RemainingRecordsErrorHandler’ with transactions”); } else { this.transactionTemplate = null; } if (this.containerProperties.getScheduler() != null) { this.taskScheduler = this.containerProperties.getScheduler(); } else { ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); threadPoolTaskScheduler.initialize(); this.taskScheduler = threadPoolTaskScheduler; } this.monitorTask = this.taskScheduler.scheduleAtFixedRate(() -> checkConsumer(), this.containerProperties.getMonitorInterval() * 1000); }
当目标有指定具体分区时采用assign指定分配的方式;否则采用订阅方式。 订阅方式会绑定重平衡监听器,而assign分配的方式则不会。
再看看该现成run()方法:
@Override public void run() { if (this.genericListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this); } if (this.transactionManager != null) { ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId); } this.count = 0; this.last = System.currentTimeMillis(); if (isRunning() && this.definedPartitions != null) { initPartitionsIfNeeded(); } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive; while (isRunning()) { try { if (!this.autoCommit && !this.isRecordAck) { processCommits(); } processSeeks(); ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout()); if (records != null && this.logger.isDebugEnabled()) { this.logger.debug(“Received: ” + records.count() + ” records”); } if (records != null && records.count() > 0) { if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } invokeListener(records); } else { if (this.containerProperties.getIdleEventInterval() != null) { long now = System.currentTimeMillis(); if (now > lastReceive + this.containerProperties.getIdleEventInterval() && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) { publishIdleContainerEvent(now – lastReceive, this.isConsumerAwareListener ? this.consumer : null); lastAlertAt = now; if (this.genericListener instanceof ConsumerSeekAware) { seekPartitions(getAssignedPartitions(), true); } } } } } catch (WakeupException e) { // Ignore, we’re stopping } catch (NoOffsetForPartitionException nofpe) { this.fatalError = true; ListenerConsumer.this.logger.error(“No offset and no reset policy”, nofpe); break; } catch (Exception e) { if (this.containerProperties.getGenericErrorHandler() != null) { this.containerProperties.getGenericErrorHandler().handle(e, null); } else { this.logger.error(“Container exception”, e); } } } ProducerFactoryUtils.clearConsumerGroupId(); if (!this.fatalError) { if (this.kafkaTxManager == null) { commitPendingAcks(); try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } } } else { ListenerConsumer.this.logger.error(“No offset and no reset policy; stopping container”); KafkaMessageListenerContainer.this.stop(); } this.monitorTask.cancel(true); this.consumer.close(); if (this.logger.isInfoEnabled()) { this.logger.info(“Consumer stopped”); } }
解读:
(1)consumer每次获取数据之前: a. 对于enable.auto.commit = false和 ackMode 不为 record的消费者,需要提交缓存中的offset b. 查看是否有设置最新的offset:如可能重新平衡分区,修改了当前消费者当前的offset。
(3) 调用messageListener 处理数据:
invokeListener(records)
; 会依据messageListener是否是批量处理器来分为批量处理和单条处理: private void invokeListener(final ConsumerRecords<K, V> records) { if (this.isBatchListener) { invokeBatchListener(records); } else { invokeRecordListener(records); } }
此处我们以单条处理为例详解:
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) throws Error { Iterator<ConsumerRecord<K, V>> iterator = records.iterator(); while (iterator.hasNext()) { final ConsumerRecord<K, V> record = iterator.next(); if (this.logger.isTraceEnabled()) { this.logger.trace(“Processing ” + record); } doInvokeRecordListener(record, null, iterator); } }
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record, @SuppressWarnings(“rawtypes”) Producer producer, Iterator<ConsumerRecord<K, V>> iterator) throws Error { try { switch (this.listenerType) { case ACKNOWLEDGING_CONSUMER_AWARE: this.listener.onMessage(record, this.isAnyManualAck ? new ConsumerAcknowledgment(record) : null, this.consumer); break; case CONSUMER_AWARE: this.listener.onMessage(record, this.consumer); break; case ACKNOWLEDGING: this.listener.onMessage(record, this.isAnyManualAck ? new ConsumerAcknowledgment(record) : null); break; case SIMPLE: this.listener.onMessage(record); break; } if (this.isRecordAck) { Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
if (this.containerProperties.isSyncCommits()) { this.consumer.commitSync(offsetsToCommit); } else { this.consumer.commitAsync(offsetsToCommit, this.commitCallback); } } else if (!this.isAnyManualAck && !this.autoCommit) { this.acks.add(record); } if (producer != null) { sendOffsetsToTransaction(producer); } } catch (RuntimeException e) { if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) { this.acks.add(record); } if (this.errorHandler == null) { throw e; } try { if (this.errorHandler instanceof RemainingRecordsErrorHandler) { processCommits(); List<ConsumerRecord<?, ?>> records = new ArrayList<>(); records.add(record); while (iterator.hasNext()) { records.add(iterator.next()); } ((RemainingRecordsErrorHandler) this.errorHandler).handle(e, records, this.consumer); } else { this.errorHandler.handle(e, record, this.consumer); if (producer != null) { try { sendOffsetsToTransaction(producer); } catch (Exception e1) { this.logger.error(“Send offsets to transaction failed”, e1); } } } } catch (RuntimeException ee) { this.logger.error(“Error handler threw an exception”, ee); return ee; } catch (Error er) { //NOSONAR this.logger.error(“Error handler threw an error”, er); throw er; } } return null; }
解读: (1)最终调用messageListener的onMessage(…)方法处理数据。会依据监听类型listenerType为messageListener的onMessage(…)传入不同的参数。至于messageListener后面详解。 (2)对于ackMode == record的消费者,表示每处理完一条数据,就马上提交offset; (3)对于enable.auto.commit = true的消费者,不用手动提交offset,kafka会自动提交。 (4)对于ackMode = manual 或者是 manual_immediate,则是会传参数
new
ConsumerAcknowledgment(record)到 messateLinstener.onMessage()方法,由最终的调用方法(即 @KafkaListener 注解的方法或者是endPoint中指定的bean的method)中控制提交。
(4)
KafkaListenerEndpoint 详解
KafkaListenerEndpoint 接口代表kafka监听目标,即topic/分区。其实就是@KafkaListener 注解代表的位置,,所以开接口中有代表topic和分区的字段,以及消费者组groupId等。KafkaListenerEndpoint 接口代码:
public interface KafkaListenerEndpoint {
/** * Return the id of this endpoint. * @return the id of this endpoint. The id can be further qualified * when the endpoint is resolved against its actual listener * container. * @see KafkaListenerContainerFactory#createListenerContainer */ String getId();
/** * Return the groupId of this endpoint – if present, overrides the * {@code group.id} property of the consumer factory. * @return the group id; may be null. * @since 1.3 */ String getGroupId();
/** * Return the group of this endpoint or null if not in a group. * @return the group of this endpoint or null if not in a group. */ String getGroup();
/** * Return the topics for this endpoint. * @return the topics for this endpoint. */ Collection<String> getTopics();
/** * Return the topicPartitions for this endpoint. * @return the topicPartitions for this endpoint. */ Collection<TopicPartitionInitialOffset> getTopicPartitions();
/** * Return the topicPattern for this endpoint. * @return the topicPattern for this endpoint. */ Pattern getTopicPattern();
/** * Setup the specified message listener container with the model * defined by this endpoint. * <p>This endpoint must provide the requested missing option(s) of * the specified container to make it usable. Usually, this is about * setting the {@code queues} and the {@code messageListener} to * use but an implementation may override any default setting that * was already set. * @param listenerContainer the listener container to configure * @param messageConverter the message converter – can be null */ void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter);
}
该接口有三个实现类:AbstractKafkaListenerEndpoint、MethodKafkaListenerEndpoint 和 MultiMethodKafkaListenerEndpoint, 其中AbstractKafkaListenerEndpoint是抽象类,MethodKafkaListenerEndpoint extends AbstractKafkaListenerEndpoint, MultiMethodKafkaListenerEndpoint extends MethodKafkaListenerEndpoint 。MultiMethodKafkaListenerEndpoint 是代表@KafkaListener 注解在类上的情况。
我们以MethodKafkaListenerEndpoint 详解:
主要功能: (1)指定监听位置: 主题/分区 (2)指定处理数据的类/方法: bean和method 字段。对@KafkaListener 注解而言,bean就是注解所在的类,method就是注解所在的方法。对于手动生成的endpoint实例则需要手动设置。 (3)生成messageListener。
生成messageListener详解:
@Override protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { Assert.state(this.messageHandlerMethodFactory != null, “Could not create message listener – MessageHandlerMethodFactory not set”); MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter); messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); String replyTopic = getReplyTopic(); if (replyTopic != null) { Assert.state(getReplyTemplate() != null, “a KafkaTemplate is required to support replies”); messageListener.setReplyTopic(replyTopic); } if (getReplyTemplate() != null) { messageListener.setReplyTemplate(getReplyTemplate()); } return messageListener; }
/** * Create a {@link HandlerAdapter} for this listener adapter. * @param messageListener the listener adapter. * @return the handler adapter. */ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) { InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod()); return new HandlerAdapter(invocableHandlerMethod); }
/** * Create an empty {@link MessagingMessageListenerAdapter} instance. * @param messageConverter the converter (may be null). * @return the {@link MessagingMessageListenerAdapter} instance. */ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(MessageConverter messageConverter) { MessagingMessageListenerAdapter<K, V> listener; if (isBatchListener()) { BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<K, V>( this.bean, this.method, this.errorHandler); if (messageConverter instanceof BatchMessageConverter) { messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter); } listener = messageListener; } else { RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<K, V>( this.bean, this.method, this.errorHandler); if (messageConverter instanceof RecordMessageConverter) { messageListener.setMessageConverter((RecordMessageConverter) messageConverter); } listener = messageListener; } if (getBeanResolver() != null) { listener.setBeanResolver(getBeanResolver()); } return listener; }
解读: (1)生成messageListener时会传入messageConverter,这个参数用来把获取的数据ConsumerRecord 转化成Message。 (2)messageListener 中会生成一个真正执行处理方法的字段:handleMethod。这个字段类型是HandlerAdapter,该handleMethod 由bean和method生成,用来执行bean的method方法。
HandlerAdapter 详解:
public class HandlerAdapter {
private final InvocableHandlerMethod invokerHandlerMethod;
private final DelegatingInvocableHandler delegatingHandler;
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) { this.invokerHandlerMethod = invokerHandlerMethod; this.delegatingHandler = null; }
public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) { this.invokerHandlerMethod = null; this.delegatingHandler = delegatingHandler; }
public Object invoke(Message<?> message, Object… providedArgs) throws Exception { //NOSONAR if (this.invokerHandlerMethod != null) { return this.invokerHandlerMethod.invoke(message, providedArgs); } else { return this.delegatingHandler.invoke(message, providedArgs); } }
……
} 调用接口是invoke(…)方法,也就是说最后messageListener最后会调用这个invoke(…)方法实现处理数据的逻辑。我们先看下messageListener的onMessage(…)方法。这个onMessage()方法之前有介绍,是消费者再获取到数据之后调用的。
@Override public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { Message<?> message = toMessagingMessage(record, acknowledgment, consumer); if (logger.isDebugEnabled()) { logger.debug(“Processing [” + message + “]”); } try { Object result = invokeHandler(record, acknowledgment, message, consumer); if (result != null) { handleResult(result, record, message); } } 。。。 }
protected final Object invokeHandler(Object data, Acknowledgment acknowledgment, Message<?> message, Consumer<?, ?> consumer) { try { if (data instanceof List && !this.isConsumerRecordList) { return this.handlerMethod.invoke(message, acknowledgment, consumer); } else { return this.handlerMethod.invoke(message, data, acknowledgment, consumer); } } …. } 解读:
从中可以看出,kafka最后之后只会传给handler最多四种参数:Message, Object data(一般是ConsumerRecord),Acknowledgment acknowledgment, Message<?> message。
而实际处理的bean的method方法中参数可能并没有ConsumerRecord类型的参数,而是需要的最终的Pojo,如自定义的类。那么这是怎么实现呢?首先利用messageConventer转化器将获取的数据转化成message中的payload值。然后利用handleAdept中的解析器argumentResolvers解析成对应的值。对解析成自定义的类pojo而言,需要将messageConventer设为
StringJsonMessageConverter (表示将数据转化成json格式的message的payload),然后需要提供解析器
PayloadArgumentResolver。
第一步先利用messageListener中的messageConventer转化器由consumerRecord 生成Message。 然后最终调用messageListener中的handlerMethod.invoke(…)方法实现最终的数据处理逻辑。
那么handlerMethod.invoke()方法是怎么实现数据处理逻辑的?其实是这样的: 在生成handleMethod时我们有传入bean和method,这个bean和method就是我么能提供的处理逻辑,只不过利用handleMethod采用java 反射技术进行调用。关键技术是要依据invoke(…)传入的参数解析出bean的method中需要的参数。 下面我们具体讲解下invoke(…)方法:
public Object invoke(Message<?> message, Object… providedArgs) throws Exception { Object[] args = getMethodArgumentValues(message, providedArgs); if (logger.isTraceEnabled()) { logger.trace(“Invoking ‘” + ClassUtils.getQualifiedMethodName(getMethod(), getBeanType()) + “‘ with arguments ” + Arrays.toString(args)); } Object returnValue = doInvoke(args); if (logger.isTraceEnabled()) { logger.trace(“Method [” + ClassUtils.getQualifiedMethodName(getMethod(), getBeanType()) + “] returned [” + returnValue + “]”); } return returnValue; }
/** * Get the method argument values for the current request. */ private Object[] getMethodArgumentValues(Message<?> message, Object… providedArgs) throws Exception { MethodParameter[] parameters = getMethodParameters(); Object[] args = new Object[parameters.length]; for (int i = 0; i < parameters.length; i++) { MethodParameter parameter = parameters[i]; parameter.initParameterNameDiscovery(this.parameterNameDiscoverer); args[i] = resolveProvidedArgument(parameter, providedArgs); if (args[i] != null) { continue; } if (this.argumentResolvers.supportsParameter(parameter)) { try { args[i] = this.argumentResolvers.resolveArgument(parameter, message); continue; } catch (Exception ex) { if (logger.isDebugEnabled()) { logger.debug(getArgumentResolutionErrorMessage(“Failed to resolve”, i), ex); } throw ex; } } if (args[i] == null) { throw new MethodArgumentResolutionException(message, parameter, getArgumentResolutionErrorMessage(“No suitable resolver for”, i)); } } return args; }
解读: 获取method需要的参数方法是:getMethodArgumentValues(…) (1)第一步先简单的判断method中的参数与传入的参数类型,如传入Consumer参数,且method中有Consumer类型的参数,则直接把consumer参数传给method中对应位置的参数。 (2)从Message 中解析出对应数据给method中的参数。比如从message中的payload解析到@payload 注解的字段,或者其他没法识别的参数如自定义的Pojo;从message中的heads解析各种kafka的head。 这种从message中解析数据需要提供对应的解析器。如代码: if (this.argumentResolvers.supportsParameter(parameter)) { try { args[i] = this.argumentResolvers.resolveArgument(parameter, message); continue; } 。。。
那么,argumentResolvers参数从何而来呢? 这些解析器是endpoint在利用创建messageListener时中传入的: protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) { InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod()); return new HandlerAdapter(invocableHandlerMethod); } 是在endpoint的messageHandlerMethodFactory属性中定义的。对springBoot而言,messageHandlerMethodFactory有自动生成(详情见
KafkaListenerAnnotationBeanPostProcessor.java)。若是自定义endpoint则可以指定,后文有自定义的代码。
其中HeaderMethodArgumentResolver 用来将message中的heads解析成method中的@Header注解的字段。而PayloadArgumentResolver解析器是默认解析器,用来解析message中的payload 到method中的其他参数,如自定义的类。
以下是再springBoot项目中手动定义一个消费者的相关代码:
/** * desc: * Created by going on 2017/11/30. */ @Configuration @EnableKafka @Import(value = {KafkaAutoConfiguration.class, ConsumerInvoker.class}) public class ConsumerConfigure {
private Logger logger = LoggerFactory.getLogger(ConsumerConfigure.class);
@Resource(name = “customListenerContainerFactory”) private ConcurrentKafkaListenerContainerFactory customListenerContainerFactory;
@Bean(name = “customMessageListenerContainer”) // @ConditionalOnMissingBean(name = “customMessageListenerContainer”) @ConditionalOnMissingBean /* public ConcurrentMessageListenerContainer customMessageListenerContainer( ConcurrentKafkaListenerContainerFactory factory, ConsumerInvoker consumerInvoker) throws NoSuchMethodException {*/ public ConcurrentMessageListenerContainer customMessageListenerContainer(ConsumerInvoker consumerInvoker) throws NoSuchMethodException {
logger.info(“create bean customMessageListenerContainer”);
MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint(); endpoint.setMessageHandlerMethodFactory(createDefaultMessageHandlerMethodFactory()); endpoint.setTopics(“testTopic”,”my-replicated-topic”); endpoint.setBean(consumerInvoker); endpoint.setMethod(consumerInvoker.getClass().getMethod(“invoke”, TestEntity.class, Message.class, ConsumerRecord.class, Consumer.class)); endpoint.setGroupId(“customGroup”); ConcurrentMessageListenerContainer<String,String> container = (ConcurrentMessageListenerContainer<String, String>) customListenerContainerFactory.createListenerContainer(endpoint);
return container;
}
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
DefaultFormattingConversionService conversionService = new DefaultFormattingConversionService(); defaultFactory.setConversionService(conversionService);
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
// Annotation-based argument resolution argumentResolvers.add(new HeaderMethodArgumentResolver(conversionService, null)); argumentResolvers.add(new HeadersMethodArgumentResolver());
// Type-based argument resolution final GenericMessageConverter messageConverter = new GenericMessageConverter(conversionService); argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter)); argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {
@Override protected boolean isEmptyPayload(Object payload) { return payload == null || payload instanceof KafkaNull; }
}); defaultFactory.setArgumentResolvers(argumentResolvers);
defaultFactory.afterPropertiesSet(); return defaultFactory; }
@Bean @ConditionalOnMissingBean(name = “customListenerContainerFactory”) public ConcurrentKafkaListenerContainerFactory<?, ?> customListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(false); //每次只能获取一个记录 //设置ConsumerRecord 转化成Message 的转化器 factory.setMessageConverter(new StringJsonMessageConverter());
//消息过滤策略 factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() { @Override public boolean filter(ConsumerRecord<Object, Object> consumerRecord) { logger.info(“filter record……” + consumerRecord.value().toString()); return false; } }); //空闲监测 // factory.getContainerProperties().setIdleEventInterval(60000l);
return factory; }
}
/** * desc: 最终消费数据逻辑 * Created by going on 2017/12/4. */ @Component public class ConsumerInvoker {
private Logger logger = LoggerFactory.getLogger(ConsumerInvoker.class);
/* public void invoke(ConsumerRecord<String,String> data, Consumer<String,String> consumer) { logger.info(“invoke…\n” + data.toString()); if(data.offset() % 5 == 0) { consumer.commitSync(); logger.info(“commit…”); } }*/
//采用StringJsonMessageConverter转化器 public void invoke(TestEntity testEntity, Message message, ConsumerRecord<String, String> data, Consumer<String,String> consumer) { logger.info(“invoke…”); logger.info(“data:” + data.toString()+”\n message payLoad:” + message.getPayload()); logger.info(“testEntity:” + JsonUtils.toJson(testEntity)); if(data.offset() % 5 == 0) { consumer.commitSync(); logger.info(“commit…”); } }
}