从源码中理解spring cloud kafka stream 是如何分配kafka的partitions给不同的instance的

首先,kafka的topic是由多个partitions物理分隔的。假设topic: testIn,有8个partitions

其次,我们编写的springcloud kafka stream程序,打成jar包后,可以部署多个不同的实例instances,假设部署了3个instance。

那么这3个instance是怎么分配这8个partitions的呢?

spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=true时(默认),多个实例会自动均衡

的分配partitions,由ConsumerCoordinator来自动协调,不用您操心了(跟instanceCount 和instanceIndex 没关系了,但是concurrency还是同理的), 当设置成false后:

主要是通过consumer的4个参数(org.springframework.cloud.stream.binder.ConsumerProperties)来决定的:


spring.cloud.stream.bindings.input.consumer.partitioned=true

准备启动多少个实例

spring.cloud.stream.bindings.input.consumer.instanceCount =3

该实例编号index,从0开始到instanceCount -1

spring.cloud.stream.bindings.input.consumer.instanceIndex =0

每个实例中启动多少个kafka consumer

spring.cloud.stream.bindings.input.consumer.concurrency  =1

如果按照上面的配置的话(3个instance,每个instance的并行度1):

instance0将启动一个consumer0,消费p0,p3,p6; 

instance1将启动一个consumer0,消费p1,p4,p7; 

instance2将启动一个consumer0,消费p2,p5;

主要逻辑在org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder#createConsumerEndpoint这个方法中:

protected MessageProducer createConsumerEndpoint(final ConsumerDestination destination, final String group,
			final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {

		boolean anonymous = !StringUtils.hasText(group);
		Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
				"DLQ support is not available for anonymous subscriptions");
		String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
		final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup,
				extendedConsumerProperties);
		int partitionCount = extendedConsumerProperties.getInstanceCount()
				* extendedConsumerProperties.getConcurrency();
		//如果AutoRebalanceEnabled=false的话,当instanceCount*Concurrency比实际topic的partitions数量还要多的话,将报错,因为会启动空闲的consumer
		Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
				extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
				new Callable<Collection<PartitionInfo>>() {

					@Override
					public Collection<PartitionInfo> call() throws Exception {
						Consumer<?, ?> consumer = consumerFactory.createConsumer();
						List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
						consumer.close();
						return partitionsFor;
					}

				});

		Collection<PartitionInfo> listenedPartitions;

		if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
				extendedConsumerProperties.getInstanceCount() == 1) {
			listenedPartitions = allPartitions;
		}
		else {
			listenedPartitions = new ArrayList<>();
			//为该Instance计算应该分派哪些partitions
			for (PartitionInfo partition : allPartitions) {
				// divide partitions across modules
				if ((partition.partition()
						% extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties
								.getInstanceIndex()) {
					listenedPartitions.add(partition);
				}
			}
		}
		this.topicsInUse.put(destination.getName(), new TopicInformation(group, listenedPartitions));

		Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
		final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
				listenedPartitions);
		final ContainerProperties containerProperties = anonymous
				|| extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
						? new ContainerProperties(destination.getName())
						: new ContainerProperties(topicPartitionInitialOffsets);
		//并行度取consumer参数中设置的Concurrency和实际处理的partitions子集中较小的一个。
		int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
		@SuppressWarnings("rawtypes")
		//ConcurrentMessageListenerContainer的doStart方法将根据并行度创建consumer
		final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
				new ConcurrentMessageListenerContainer(
						consumerFactory, containerProperties) {

			@Override
			public void stop(Runnable callback) {
				super.stop(callback);
			}

		};
		messageListenerContainer.setConcurrency(concurrency);
		if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
			messageListenerContainer.getContainerProperties()
					.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
			messageListenerContainer.getContainerProperties().setAckOnError(false);
		}
		else {
			messageListenerContainer.getContainerProperties()
					.setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
		}
		if (this.logger.isDebugEnabled()) {
			this.logger.debug(
					"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
		}
		final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
				messageListenerContainer);
		kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
		ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, consumerGroup,
				extendedConsumerProperties);
		if (extendedConsumerProperties.getMaxAttempts() > 1) {
			kafkaMessageDrivenChannelAdapter.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
			kafkaMessageDrivenChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
		}
		else {
			kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
		}
		return kafkaMessageDrivenChannelAdapter;
	}

org.springframework.kafka.listener.ConcurrentMessageListenerContainer#doStart方法:

@Override
	protected void doStart() {
		if (!isRunning()) {
			ContainerProperties containerProperties = getContainerProperties();
			TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
			if (topicPartitions != null
					&& this.concurrency > topicPartitions.length) {
				this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
						+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
						+ topicPartitions.length);
				this.concurrency = topicPartitions.length;
			}
			setRunning(true);
                        //根据并行度生成N个KafkaMessageListenerContainer,
                        //每个KafkaMessageListenerContainer的构造函数中会初始化一个KafkaConsumer
			for (int i = 0; i < this.concurrency; i++) {
				KafkaMessageListenerContainer<K, V> container;
				if (topicPartitions == null) {
					container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
				}
				else {
				//从前面分给Instance的partitions子集中,再指派一些partitions给具体的并行consumer
					container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
							partitionSubset(containerProperties, i));
				}
				if (getBeanName() != null) {
					container.setBeanName(getBeanName() + "-" + i);
				}
				if (getApplicationEventPublisher() != null) {
					container.setApplicationEventPublisher(getApplicationEventPublisher());
				}
				container.setClientIdSuffix("-" + i);
				container.start();
				this.containers.add(container);
			}
		}
	}

  //具体的二级子集生成策略
	private TopicPartitionInitialOffset[] partitionSubset(ContainerProperties containerProperties, int i) {
		TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
		if (this.concurrency == 1) {
			return topicPartitions;
		}
		else {
			int numPartitions = topicPartitions.length;
			if (numPartitions == this.concurrency) {
				return new TopicPartitionInitialOffset[] { topicPartitions[i] };
			}
			else {
				int perContainer = numPartitions / this.concurrency;
				TopicPartitionInitialOffset[] subset;
				if (i == this.concurrency - 1) {
					subset = Arrays.copyOfRange(topicPartitions, i * perContainer, topicPartitions.length);
				}
				else {
					subset = Arrays.copyOfRange(topicPartitions, i * perContainer, (i + 1) * perContainer);
				}
				return subset;
			}
		}
	}

看完以上核心代码,应该就很清楚partitions的分配策略了,下面我们改一下场景,启动2个instance,每个instance的concurrency=3,那么:

instance0将分到[p0,p2,p4,p6]这4个分区,然后会启动3个consumer,consumer0将消费[p0],consumer1消费[p2],consumer2消费[p4,p6]。

instance0将分到[p1,p3,p5,p7]这4个分区,然后会启动3个consumer,consumer0将消费[p1],consumer1消费[p3],consumer2消费[p5,p7]。

可以看到每个instance的consumer2其实会多消费一个分区,不是很均匀,所以应该尽可能能整除,比如启动2个instance,concurrency=4,或者concurrency=2。当concurrency=2时,每个实例的每个consumer将消费两个partition。

看到这里,你应该对这几个参数有了一个比较深刻的理解了,这样在stream程序扩容或者缩容的时候应该能正确的配置参数.

    原文作者:Spring Cloud
    原文地址: https://blog.csdn.net/xiao_jun_0820/article/details/80166131
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞