Kafka官方文档翻译(三)-Consumer API

该客户端透明的处理Kafka代理的失败,并透明的适应它获取的主题分区在集群内的迁移。该客户端还与代理进行交互,允许消费者组用消费者组来负载平衡消费。

消费都维持到必要代理的的TCP连接以获取数据。在使用后关闭消费者失败会泄漏这些连接。消费者是非线程安全的。

跨版本兼容性

该客户端可以与V0.10.0或者更新版本的代理通讯。更旧或者更新版本的代理可能不支持些特性。例如:0.10.0版的代理不支持offsetsForTimes,因为该特性在0.10.1版本才被添加。当调用当前运行的代理的版本中不可用的API时,你将会收到一个UnsupportVersionException的异常。

偏移量和消费位置

Kafka为分区中的每条记录维护着一个数字的偏移量。该偏移量充当着该记录在在分区中的唯一标识符,同时表示消费者在分区中的位置。例如,一个消费者的位置为5,表示已经消费了从偏移量从0到4之间的记录,下一个将要消费的是偏移量为5的记录。实际上有两个与消费者用户相关的位置的概念:

消费者的位置表示的接下来将要读取的记录的偏移量。它交比消费者在分区中已见的最大偏移量大.每次消费调用pool(Duration)方法接收到消费时它都会自动向前推进。

已提交的位置是已被安全的存储的最后一个偏移量。如果该过程失败并重启,该消费将恢复到该偏移量。该消费即可以定期自动的提交偏移量,也可以选择通过调用提交的API(commitSync 和commitAsync)中的一个手动提交偏移量。

这个差异让消费控制何时确认记录已经被消费了。下面会进一步详细讨论。

消费者组和主题订阅

Kafka使用消费者组的概念来允许一个过程池来划分消费和处理记录的工作。这些进程可以运行在同一台机器上,也可以分不在不同的机器上,从而为处理提供扩展性和容错性。所有共享同一个group.id的消都实例将将属于同一个消费者组。

消费者组中的每个消费都可以通过一个订阅的API动态的设置想要订阅的主题列表。Kafka会将被订阅的主题中分发每一条消息给每个消费都组只的一个进程。这是通过在消费都组中的所有成员间平衡分区来实现的,这样在消费者组中每个分区刚好被分配给其中一个消费者。因此,如果一个主题有4个分区,一个消费者组有两个进程,每个进程将会从两个分区进行消费。

消费都中的成员关系是被动态维护的:如果一个进程失败了,该进程被分配的分区将会被分配给消费组内其他的消费者。同样,如果有新的消费者加入该组,已有消费者的分区会被转移到新消费者。这可以被认为是组内再平衡,接下来将会讨论更多细节。组内再平衡同样用于当新的分区被添加到一个被订阅的主题,或者当新创建的主题与订阅的正则表达式相匹配时。该组将会通过定期元数据刷新检查新的分区,同时将它分配给组内的成员。

从概念上讲,你可以认为一个消费者组是一个碰巧由多个进程组成的一个单个逻辑订阅者。作为一个多订阅者系统,Kafka自然支持在不复制数据的情况下为给定主题拥有任意个消者组。(另外,消费都实际上非常便宜)。

这是消息系统中常规功能的略微概括。获取类似传统消息统中队列的语义,所有的进程应该属于同一个消费者组,于是记录的传输将在组上与队列平衡,但是,与传统消息系统不同,你可以拥有多个此类组。传统消息秒统中获取类似中发布-订阅的语文,每个进程都应该有它所属的消费都组,因此,每个进程将订阅所有已发布到该主题的记录。

另外,当组自动发生重新分配,可以通过ConsumerRebalanceListener提醒消费者,这将允许它们去完必要的应用程序级别的逻辑,比如状态清理,手动提交偏移量。

同样有可能使用assign(Collection)方法为消费者手动分配特定的分区(类似较早的“简单”消费者),在这种情况下,动态分区分配和消费者组协作将被禁用。

检测消费者故障

在订阅了一组主题后,当poll(Duration)方法被调用时,该消费都会自动加入组。poll这个API被设计成确保消费者活力。只要你持续的调用poll,该消费者将会保持在组中,持续的从被分配的分区中接收到消息。在底层,该消费会发送定期心跳信号给服务器。如果消费都崩溃或者在持续session.timeout.ms时间内无法发送心跳信号,则该消费者将会被认为已经死亡,并且它的分区将会被重新分配。

消费者可能遭遇”livelock”的情况,即持续发送心跳,但没有取得任何进展。为了避免在这种情况下消费者被分区持续持有,我们用max.poll.interval.ms配置提供了一个活锁发现机制。基本上,如果你在配置的max interval内没有调用poll方法,该客户端会主动的离开组,以便其它的消费者可以接管它的分区。当这种情况发生时,你可能会看到一个偏移量提交故障(通过调用commitSync()方法抛出的CommitFailedException异常提示),这是一个安全机制,即保证只有组内活跃的成员才能提交偏移量。因此,为了保持在组里,你必须持续的调用poll方法。

消费都提供两种配置设置来控制poll循环的行为:

  1. max.poll.interval.ms:通过增加两个预期轮询的间隔,你可以给预消费都更多时间来处理poll(Duration)方法返回的记录批量记录。其缺陷是增加这个值可能延迟组重新平衡,因为消费者只会加入调用poll中的重新平衡。你可以使用这个设置限定完成一次重新平衡的时间,但是你冒着进展缓慢的风险,如果消费者事实现无法足够频繁的调用poll方法。
  2. max.poll.records:使用这个设置限定单次调用poll返回的记录总数。这会使得更容易预测每次轮询间隔内必须处理的最大值。通过调优这个值,你可能可以减少轮询的间隔,这将减少组再平衡的影响。

对于消息处理时间变化不可预测的用例,这些选项可能都不够。推荐将消息处理转换移到其他线程的方式来处理这些用例,即允许消费者持续调用poll,同时处理保持工作。必须采取一些谨慎的措施来保证提交的偏移量不会超过实际位置。通常,只有在线程完成对记录的处理之后,你才必须禁用自动提交,手动提交已处理记录的偏移量。同时要注意,你将必须暂停该分区,以便不会从轮询从接收到新的记录,直到线程处理完上次轮询返回的记录。

应用示例

消费者API提供了覆盖各种消费用例的灵活性。以下是一些示例演示如何使用它们。

自动提交偏移量

这个示例演示了依赖自动提交偏移量的Kafka消都API的简单用法。

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

通过使用bootstrap.servers指定要联系的一个或多个代理的列表来引导与集群的连接。这个名单只用于发现集中群空闲的代理,并不需要是集群中所有服务器的完整清单(尽管你可能想指定多于一个,以防客户端连接时有服务器宕机)。

设置enable.auto.commit意味着偏移自动提交,其频率由auto.commit.interval.ms控制。

在这个示例中,该消费订阅了主题foo他bar,同时归属于group.id为test的消费者组。

反序列化配置指定了如果将字节转换为对像。例如,通过指定字符串反序列化器,我们说我们的记录的键和值只是简单字符串。

手动控制偏移量

不同于依赖于消费者定期提交已消费的偏移量,用户也可以控制当记录应该被认为已消费了才提交它们的偏移量。当消费的消息与一些逻辑耦合时,这是非常有用的,因此在完成处理之前,不应该将消息视为已消费。

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }

在这个示例中,我们将消费一批记录,并在内存中批处理。当我们批处理足够量的记录时,我们将把它们插入到数据库。如果我们允许偏移量像上个示例一样自动提交,记录在轮询中返回给用户后就会被认为已消费。我们的进程可能在批处理数据后失败,但是在它们被插入到数据库之前就失败了。

为了避免这种情况,我们将仅在相关记录都被插入到数据库后手动提交偏移量。这使我们精确控制什么时候记录被认为是已消费了。这提出了另一种可能性,该进程可能在插入到数据后但是在提交前的间隔中失败(尽管这可以仅仅几毫秒,但是有可能)。在这种情况下,顶替消费的进程将从最后一提交的偏移量开始消费,这将重复插入上个批次的数据。以这种方式使用Kafka提供的“至少一次”的分发保证,因为每条记录可能会被分发一次,但是在失败的情况可能会被复制。

注意:使用自动提交偏移量也可以给你“至少一次”分发,但是前提是你必须在任何随后调用之前或关闭消费者之前消费完所有每次轮询返回返回的数据。如果这些有任何失败,则可能提交的偏移量超过已消费的位置,其结果是丢失记录。使用手动控制偏移量的好处是你可以直接控制在什么时候记录被认为“已消费”。

上面的示例使用commitSync去标记所有接到记录为已提交。在某些情况下,你可能希望通过明确指定偏移量更精细的控制被已被提交的偏移量。在下面的示例中,我们在处理完每个分区中的记录后再提交偏移量。

     try {
         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

注意:被提交的偏移量应该总是你的应用程序接下来将要读取的消息的偏移量。从而,当我们调用commitSync(offsets)方法时,你应该在最后处理的消息的偏移量上加1。

手动分配分区

在前面的示例中,我们订阅订阅了我们感兴趣的主题,同时让Kafka基于消费者组中的活跃消费者动态的公平的分配这些主题的分区。然而,某些时候你可能需要更精确的控制已被分配的特定分区。例如:

  • 如果该进程维护某些与分区相关的本地状态(例如一个本地磁盘的键-值存储),那么它应该只获取它在磁盘上维护分的分区的记录。
  • 如果该进程自身就是高可用,且失败时将会重启(可能使用了集群管理框架,比如Yarn,Mesos,或者AWS设备,或者作为一个流式处理框架的一部分)。在这种情况下,Kafka没有必要检测失败和重新分配分区,因为消费进程将会在另一台机器上重启。

使用这种模式替代使用subscribe方法订阅主题,你只需要调用assign(Collection),并传入你想要消费的全部的分区的名单。

     String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));

一旦分配了,就可以像前面的示例一样,在循环中调用poll来消费记录。消费都指定的组仍然用于提交偏移量,但是现在分区集只会在再一次调用assign时才会改变。手动分区分配不使用组协调,所以消费失败将不会引起已分配的分区再均衡。每个消费都是独立都行动,即使它与其他消费者共享一个组ID.为了避免偏移量提交冲突,你通常应该确保每个消费实例的组ID都是唯一的。

注意,不可能将手动分区分配(即使用assign)与通过主题订阅(即使用subscribe)动态分区分配混合使用。

在Kafka之外存储偏移量

消费者应用程序非必须使用Kafka内置的偏移量存储,即可以在自己选择的一个存储中保存偏移量。这样做的主要用例是允许应用程序在同一个系统中保存偏移量和消费的结果,即以原子方式保存结果和偏移量。这并不总是可行的,但是当它出现时,它将使消费完全原子化,并提供“恰好一次”语义,这比使用Kafka的偏移提交功能得到的默认“至少一次”语义更强。

以下是此类用法的几个示例:

  • 如果消费的结果被保存在一个关系型数据库,在数据库里保存偏移量,也可以允许在同一个事务中提交结果和偏移量。因此,要么事务成功并基于消费的的内容更新偏移量,或者不存储结果也不更新偏移量。
  • 如果结果被存储在一个本地的存储中,则也可以在那里存储偏移量。例如,你可以通过订阅特定分区并将偏移和索引数据一起保存来构建搜索索引。如果这是以原子方式完成的,通常可能会出现这种情况,即使发生崩溃导致未同步的数据丢失,剩下胡任何内容也会存储相应的偏移量。这意味着在这种情况下,返回丢失最近更新的索引过程只会恢复索引,确保没有更新丢失

每条记录都有自己的偏移量,所以,管理自己的偏移量,你只需要执行下面的操作:

  • 配置enable.auto.commit=false
  • 使用每个ConsumerRecord提供的偏移量来保存你的位置
  • 重启时使用seek(TopicPartition,long)方法来恢复消费者的位置

当分区分配也是手动完成时,这种类型的使用最简单(这与上面描述的搜索索引的用例非常相似)。如果分区分配是自动完成,需要特别注意处理分区分配改变的情况。这可以通过在对subscribe(Collection, ConsumerRebalanceListener) and subscribe(Pattern, ConsumerRebalanceListener)的调用中提供一个 ConsumerRebalanceListener实例来完成。例如,当分区取自一个消费者,该消费者将希望通过实现ConsumerRebalanceListener.onPartitionsRevoked(Collection). 来提交这些分区的偏移量。当分区被分配给一个消费者,该消费者将希望查找这些新分区的偏移量,并通过实现ConsumerRebalanceListener.onPartitionsAssigned(Collection).将消费者正确初始化为该位置。

ConsumerRebalanceListener的另一种常见用法是刷新应用程序为移动到其它地方的分区维护的任何缓存。

控制消费者的位置

在大多数用例中,消费者只是从开始到结束消费记录,定期提交它的位置(不论是自动还是手动)。然而,Kafka允许消费者手动控制它的位置,随意在分区内向前或向后移动位置。这意味着消费者可以重复消费较旧的记录,或者跳过到最近的记录而不实际的消费中间的记录。

有几种情况可以手动控制消费者的位置。

一个案例时对时间敏感的记录处理,对于远远落后于不试图赶上处理所有记录的消费者来说可能是有意义的,而只是跳到最近的记录。

另一个案例是维护本地状态的系统,如上一节所述。在这样一个系统中,消费希望在启动时将它的位置初始化到被保存在本地存储的位置。同样,如果本地状态被销毁,该状态可能通过重新消费所有数据并重新创建状态来重新创建状态。

Kafka允许使用seek(TopicPartition)为指定的新分区指定位置。寻找服务器维护的最早和最新偏移的特殊方法也可用(seekToBegining(Collection)和seekToEnd(Collection))。

消费流量控制

如果一个消费者从多个已分配的分区中获取数据,它将会尝试同时从所有分区中消费,从而效地为这些分区提供相同优先级以供消费。然而,在某些情况下消费者可能希望首先全速从其中一些分区中获取数据,只有当这些分区只有很少或者没有数据消费时才开始从其它的分区中获取数据。

流式处理是一个这样的案例,处理器从两个主题获取数据,同时连接这两个流。当其中一个主题长期满后于另一个,该处理器可能会暂停从领先的主题获取数据,以便滞后的流能赶上来。另一个示例是引导有许多历史数据需要处理的消费重启,该应用程序通常希望在考虑获取其它主题之前获取一些主题最新数据。

Kafka支持使用pause(Collection)和resume(Collection)动态控制消费流,以便在未来的轮询(持续时间)调用中分别暂停指定分区上的消费和恢复指定暂停分区上的消费。

读取事务消费

事务是在Kafka0.11.0引进的,其中应用程序可以原子地写入多个主题和分区。为了以这种方式工作,应该将从这些分区读取数据的消费者配置为只读取已提交的数据。这可以通过配置在消费者的配置中设置isolation.level=read_committed来实现。

在read_committed模式下,消费者只读取已成功提交的事务消息。它将像以前一样继续读取非事务性消息,在read_committed模式下没有客户端缓冲。相反,read_committed消费者分区结束的偏移量可能 属于打开事务的分区中的第一个消息的偏移量。这个偏移量称为“最后一个稳定偏移量“(LSO)

一个read_committed消费者将只读取LSO并过滤掉已经中止的任何事务性消息。对于read_committed消费者,LSO也影响着seekToEnd(Collection)和endOffsets(Collection)的行为。最后,还将读取延迟度量调整为相对于read_committed消费者的LSO。

事务性消息分区将包含表明事务结果的提交或中止标记。这些标记没有返回给应用,但是在日志中会有一个偏移量,从事务性消息的主题中读取应用程序将在消息者偏移量中看到缺口。这些丢失的消息将作为事务标记,同时在两个级别上为消费者过滤它们。此外,使用read_committed消费者的应用程序可能还会看到由于事物中止而导到的空白,因为这些消息不会由消费者返回,但是会有有效的偏移量。

多线程处理

Kafka的消费者不是线程安全的。所有网络I/O发生进行调用的应用程序的线程中。确保多线程访问正确同步是用户的责任。非同步的访问将造成ConcurrentModificationException

该规则的唯一异常是wakeup().它可以安全地从外部线程用于中断活动操作。在这种情况下,一个 WakeupException 将会从阻塞在操作的线程中抛出。这可以用于从另一个线程判关闭消费者。下面的代码片段展示了典型的模式:

public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;


     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }


     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 } 

然后在一个单独的线程中,可以通过设置关闭标志并唤醒使用者来关闭使用者。

请注意,虽然可以使用线程中断而不是wakeup()来中止阻塞操作(在这种情况下,会引发InterruptException),但我们不鼓励使用它们,因为它们可能会导致彻底终止使用者。中断主要支持那些不可能使用wakeup()的情况,例如,当使用者线程由不知道Kafka客户机的代码管理时。

    原文作者:错觉
    原文地址: https://zhuanlan.zhihu.com/p/59523838
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞