2018年第17周-RabbitMQ的模式与Kafka的设计

理解概念的一个方法

之前说过学习一个新的东西,最核心的就是掌握概念。而如何掌握概念呢?我的其中一个方法就是对比,把相似且模糊不清的两个概念进行对比,这样就理解更快。

RabbitMQ模式

RabbitMQ有以下模式:
1.工作队列(Worke Queues)
发消息和收消息都是直接通过队列。在耗时比较多的任务,我们把任务放入队列里,然后每个工作者去获取任务然后处理。所以这个工作队列,也称为任务队列(Task Queues)。这样就将耗资源的任务从产生任务的应用上解耦出来。
这个模式最主要的特征是:每个任务只会分发到一个工作者中。

《2018年第17周-RabbitMQ的模式与Kafka的设计》

2.发布/订阅(Publish/Subscribe)
这个发布/订阅和观察者模式很像,但不是同一个东西。具体可看看发布/订阅和观察者区别。
在这里,RabbitMQ引入了交换器(Exchange)的概念,生产者不直接与队列交互,而是通过交换器去与队列进行交互(或者叫绑定)。也就说生产者只和交换器交互。引入交换器这概念后,这消息中间件可以玩的花样就多了。发布/订阅(Publish/Subscribe)就是其中的一个。这里使用到的就是fanout的交换器。
这个模式最主要的特征是:类似于广播(broadcast),同个消息可以发送到不同的队列中去,而且这fanout交换器也不关系队列有哪些,只要队列和fanout交换器有绑定就发送,这样就可以将消息重复发送到不同的队列上。
与工作队列模式的区别是:发布/订阅的概念叫消息,而不是任务。所以消息可以重复的放入不同的队列中。

《2018年第17周-RabbitMQ的模式与Kafka的设计》

3.路由(Routing)
路由模式也是引入交换器概念后,消息中间件玩的一个花样。这里用到的交换器叫direct。
在这模式里,得新增两个概念,分别是binding key和routing key, binding key是对于队列来说的,在其与direct交换器绑定时指定binding key。而routing key是对于消息来说的,在其发送消息到direct交换器时,需指定routing key。这样routing key能够和binding key匹配得上的(就是值相等),direct交换器就会将消息发送到对应binding key的队列上。
这个模式最主要的特征是:控制消息的精度更高,可以指定哪些消息发送到哪些队列里。
与发布/订阅模式的区别是:区别是发布/订阅是广播,将消息发送到任何绑定交换器的队列上,所以没能力选择消息,而路由是需binding key和routing key匹配上,消息才能发送到对应binding key的队列上,从而有能力去选择消息。
与发布/订阅模式的相同点是:可以将消息重复发送。
注:队列可以绑定多个routing key

4.主题(Topics)
当然,主题模式也是引入交换器概念后,消息中间件玩的一个花样。这里用到的交换器叫topic。
这里用到的也是binding key和routing key,但不一样的是,routing_key不能指定明确的key。而是这个key需要带有点“.”,如 “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。而在这模式下,binding key的指定可以更广泛些,其结构是这样的”.orange.” 、 “..rabbit” 和”lazy.#”。其中*(星号)是可以代表一个单词,#(井号)是可以代表零个或多个单词。也跟路由类似的,只要这样routing key能够和binding key匹配得上的(这里可以不用值相等,模式匹配上即可),topic交换器就会将消息发送到对应binding key的队列上。

如Q1队列的binding key是”
.orange.“,而 Q2是”
..rabbit”和”lazy.#”。如果消息的routing key是 “quick.orange.rabbit” 则此消息会被发送到Q1和Q2队列上。routing key是”quick.orange.fox”的消息只会发送到Q1队列上。routing key是”lazy.pink.rabbit” 的消息只会发送到Q2队列一次,routing key是 “quick.brown.fox” 的消息没有匹配任何的binding key则此消息丢弃。

注:队列可以绑定多个routing key

5.远程过程调用RPC(Remote Procedure Call)
RPC可以远程调用函数,等待服务器返回结果。

RPC的一个备注:RPC虽然用得很广泛,然而它也有不足之处,就是开发人员无法清晰的知道自己调用的这个函数到底是本地函数还是很慢的RPC。这种困惑很容易导致出一个不可预测的系统和增加没必要的复杂性导致难以定位问题。如果不用简单的程序,误用RPC还可能写出很维护的意大利面条式的代码。。
对于这个问题,有三个建议

  • 保证函数是很容易被辨别出是本地函数还是远程函数。
  • 文档化,清晰地记录组件间的依赖。
  • 处理网络带来的异常,如超时等。

当出现用RPC是否必要时,如果可以的话,你最好用异步管道(asynchronous pipeline)的形式,而不是使用阻塞形式的RPC。

RabbitMQ可以用于构建RPC系统。一个客户端和一个可扩展的RPC服务器。不过此功能不太常用,所以就不留篇幅来讲解。大概原理就是可以新增消息的属性,从而将请求和响应的消息给匹配上。

观察者模式和发布/订阅模式的区别

观察者模式
观察者模式的定义:对象间的一种一对多的组合关系,以便一个对象的状态发生变化时,所有依赖于它的对象都得到通知。
举个例子

假设你正在找一份软件工程师的工作,对“香蕉公司”很感兴趣。所以你联系了他们的HR,给了他你的联系电话。他保证如果有任何职位空缺都会通知你。这里还有几个候选人也你一样很感兴趣。所以职位空缺大家都会知道,如果你回应了他们的通知,他们就会联系你面试。

该模式必须包含两个角色:观察者和观察对象,香蕉公司就是被观察者Subject,你就是Observers(还有和你一样的候选人),当被观察者状态发送变化(比如职位空缺)就会通知(notify)观察者,前提是Observers注册到Subject里,也就是香蕉公司的HR得有你的电话号码。

发布/订阅模式
在观察者模式中的Subject就像一个发布者(Publisher),而观察者(Observer)完全可以看作一个订阅者(Subscriber)。subject通知观察者时,就像一个发布者通知他的订阅者。这也就是为什么很多书和文章使用“发布-订阅”概念来解释观察者设计模式。但是这里还有另外一个流行的模式叫做发布-订阅设计模式。它的概念和观察者模式非常类似。最大的区别是:
在发布-订阅模式,消息的发送方,叫做发布者(publishers),消息不会直接发送给特定的接收者(订阅者)。
意思就是发布者和订阅者不知道对方的存在。需要一个第三方组件,叫做消息中间件,它将订阅者和发布者串联起来,它过滤和分配所有输入的消息。换句话说,发布/订阅模式用来处理不同系统组件的信息交流,即使这些组件不知道对方的存在。

Kafka设计(DESIGN)

动机

我们设计kafka,是希望它能成为统一的平台来处理大公司可能拥有的所有实时数据流。要做到这一点,我们必须考虑相当广的用例(use case)。

  • 它需要拥有高吞吐量来支持大容量事件流,如实时日志聚合(real-time log aggregation)。
  • 它需要优雅地处理大量的数据备份,用于支持离线系统的周期性数据负载。
  • 它需要处理低延迟的传递,用于支持传统的消息传递系统用例。

我们想它是分区、分布式、实时处理信息流,以创建新的信息流和传输信息流。这些动机造就了kafka的分区和消费者模型。
最后有可能数据流被输入到其他数据系统中,而这些系统需要对外提供服务,所以kafka需要有能力保证容错性,哪怕存在有机器宕机。

为了支持上述这些,我们设计了一些独特元素,更类似于数据库日志,而不是传统的消息传递系统。

我们将在下面部分中概述设计中的一些元素。

持久化(Persistence)

别害怕文件系统

kafka重度依赖文件系统,用文件系统来存储和缓存消息。人们都由这感觉“硬盘很慢”,以致于大家怀疑一个持久化架构是否能具有竞争力的性能。实际上硬盘它很快也很慢,这取决于我们怎么去使用它。一个合理的硬盘架构通常可以和网络一样快。(看来作者的网速都很快)。
硬盘性能的关键是,磁盘驱动器的吞吐量与过去十年的硬盘搜索的延迟有所不同。因此在6×7200rpm SATA RAID-5阵列的JBOD配置上的线性写的性能大约为600MB/秒,但随机写入的性能仅为100k/秒,即超过6000倍的差别。这些线性读写是所有使用模式中最可预测的,并且由操作系统进行了大量优化。现代操作系统都提供了预读取(read-ahead)和后写(write-behind)操作的技术,这些支持多次读取到一个大块中和合并小的逻辑写形成一个大的物理写。这问题更深入的讨论可以在这找到 ACM Queue article,他们确实发现顺序硬盘读写在某些情况下比随机内存访问还快

为了弥补这些性能差异,现代操作系统越来越着重使用主存来做磁盘缓存。现代操作系统很乐意将空余内存转移到磁盘缓存中,但这需要承受在内存被回收时带来的一点点的性能损失。所有硬盘读写都通过这统一的缓存(磁盘缓存)。如果没有直接IO,这特性并没有那么容易被抛弃。因此即使一个进场维护自己数据缓存时,这些数据将会在OS的页缓存里复制两份,两次高效地存储所有东西。

此外,我们是在JVM基础上建立的,任何一位有花时间去研究Java内存的使用,都会知道以下两件事情:
1.对象的内存开销非常高,通常会使要存储的数据的大小增大一倍(甚至更多)。
2.随着堆内存的增加,Java垃圾收集会变得越来越繁琐和缓慢。

也正是使用文件系统和依赖页缓存(pagecache)带来的结果优于维护一个内存中的缓存(in-memory cache)或是其他结构,通过对所有空闲内存进行自动访问,我们至少可以将可用缓存加倍,并且还可以继续加倍,通过存储紧凑的字节结构而不是单个对象。这样做的话可以在32GB的机器上使用28-30GB缓存,而不用担心GC问题。而且,即使服务重启,这些数据也保持热度,对比起来,进程内存中的缓存在重启后需要重建(对于10GB的缓存可能需要10分钟),否则它需要从一个完全冷的缓存开始(这可能意味更糟糕的初始化性能)。这也极大地简化了代码,因为在缓存和文件系统之间保持一致性的所有逻辑现在都在操作系统中,这比一次性在进程内尝试更有效、更正确。如果您的磁盘使用倾向于线性读取,那么预读取将有效地预操作这些缓存。

这表明了一个非常简单的设计:在我们耗尽空间的时候,与其保持尽可能多的内存并将其全部清空到文件系统,不如反过来,数据都是被立即写入到文件系统上的持久日志中,而不必刷新到磁盘。实际上,这仅仅意味着它被转移到内核的页缓存中。
以页缓存为核心的设计,在这里文章里有被描述,此文章是Varnish的设计。

只需要常量时间(Constant Time Suffices)

在消息传递系统里的持久化数据结构通常是一个消费者队列关联着一棵BTree或者其他通用的随机访问数据结构来维护消息的元数据。BTree是一个万能的数据结构,可以在消息传递系统中支持各种事务和非事务性的语义。但它带来相当高的成本:BTree操作是O(log N)。通常O(log N)本质上被认为是等于常量时间,但对于硬盘操作则并不是这样。磁盘寻轨达到10ms,并且每个磁盘一次只能执行一次寻轨,所以并行性是有限的。因此,即使是少量的磁盘寻轨也会导致很高的开销。由于存储系统将非常快的缓存操作与非常慢的物理磁盘操作混合在一起,因此当在缓存固定时,数据增加时,树结构的性能通常是超线性的。数据加倍则会使速度慢两倍以上。
直观上,一个持久的队列可以建立在简单的读取和追加的形式,这通常也是日志解决方案使用的。这结构有这样的好处,所有操作都是O(1),并且读操作不会阻塞写和读的操作。这是具有明显的优势,是因为性能完全与数据量大小解耦了,一个服务现在可以充分利用那些大量的,且便宜,低转速的SATA驱动器。虽然硬盘的寻轨性能差,但它们的大型读和写的性能还是可以接受的,而且还是三分之一的价格就有三倍的容量。
在没有任何性能惩罚的情况下访问几乎无限的磁盘空间意味着我们可以提供一些在消息传递系统中不常见的特性。例如,在kafka中,我们可以在相对较长的时间内保留消息(比如一个星期),而不是每次消费完就删除消息。这将给消费者带来很大的灵活性。

效率(Efficiency)

我们在效率方面付出大量的努力。我们最初用例中的一个是处理网站活动数据,这可以是非常大量的数据:每个页面的访问都会产生许多写操作。此外,我们假设每条消息至少被一个消费者读取(通常是很多消费者),因此我们努力让消费尽可能的便宜。
我们还发现,经历过构建和运行多个类似的系统,有效的多租户业务的关键是效率。
我们在前面章节讨论过硬盘的效率。一旦消除了糟糕的磁盘访问模式,在这种类型的系统中有两个常见的低效原因:太多小的I/O操作和过度的字节复制。
这小IO问题发生在客户端和服务器之间,和服务器自身的持久化操作中。
为了避免这种情况,我们的协议是围绕一个“消息集(message set)”抽象构建的,该抽象可以自然地将消息分组在一起。这允许网络请求将消息分组,并分摊网络往返的开销,而不是一次发送一条消息。服务器依次将大量的消息追加到其日志中,而消费者一次获取大量的线性块。
这个简单的优化产生数量级的加速。批处理导致了更大的网络数据包、更大的顺序磁盘操作、连续的内存块等等,所有这些都使得Kafka可以将随机消息写入的流变成 线性的写 流给消费者。
另一个低效率的是字节复制。在低消息率下,这不是一个问题,但在负载下的影响是显著的。为了避免这种情况,我们采用了一种标准化的二进制消息格式,由生产者、代理和消费者共享(因此数据块可以在不进行修改的情况下传输)。
broker维护的消息日志本身就是一个文件目录,每个文件都由一个以生产者和消费者使用的相同格式写入磁盘的消息集的序列填充。保持这种通用格式可以优化最重要的操作:持久日志块的网络传输。现代unix操作系统为将数据从页缓存传输到套接字提供了高度优化的代码路径;在Linux中,这是通过sendfile的系统调用完成的。
要了解sendfile的作用,首先最重要先理解将数据从文件传输到套接字的公共数据路径:
1.操作系统从磁盘读取数据到内核空间的页缓存。
2.应用程序将数据从内核空间读取到用户空间缓冲区中。
3.应用程序将数据返回到内核空间,并将其写入套接字缓冲区。
4.操作系统将数据从套接字缓冲区复制到通过网络发送的NIC缓冲区。
有4次复制,两次系统内核调用,这样的效率当然就低下。使用sendfile,通过允许操作系统直接将数据从页缓存发送到网络,避免了重复复制。因此在这个优化的路径中,只需要最后的复制,一次从磁盘复制到NIC缓冲区即可。——零拷贝(zero-copy)
我们期望一个常见的用例是在一个主题上有多个使用者。使用上述的零拷贝优化,数据被完全复制到页缓存中,并在每次读取时重复使用,而不是存储在内存中并在每次读取时将其复制到用户空间。这就允许以接近网络连接的极限的速率来读取消息。
页缓存和sendfile的组合意味着,在一个Kafka集群上,在有消费者的机子上,您将看到磁盘上没有任何读取活动,因为它们将完全从缓存中提供数据。
更多Java支持的sendfile和零拷贝,请点击这里

端到端的批量压缩

在某性情况下,事实上真正的瓶颈不是CPU也不是硬盘,而是网络带宽。对于需要在广域网上的数据中心之间发送消息的数据管道来说,尤其如此。当然,用户自己可以压缩消息而不需要kafka的支持。但这可能导致非常差的压缩比,特别是当消息的冗余字段很多(如JSON里的字段名和网站日志里的user agent或公共字符串)。高效的压缩需要多个消息压缩在一起,而不是每个消息独立压缩。
Kafka用高效的批处理格式支持这一点。可以将一批消息聚合到一起压缩,并以这种形式发送到服务器。这批消息将以压缩的形式写入,并且将在日志中保持压缩,并且只会被使用者解压。
Kafka支持GZIP、Snappy和LZ4压缩协议。关于压缩的更多细节可以在这里找到。

生产者(The Producer)

负载均衡(Load balancing)

生产者直接发送数据到broker,不需要任何的中间路由层,而接受的broker是该分区的leader。为了帮助生产者实现这一点,所有Kafka节点都可以回答关于哪些是可用服务器的元数据的请求,以及在任何给定的时间内,某个主题的分区的leader是否允许生产者适当地发送它的请求。
由客户端控制它想往哪个分区生产消息。这可以随机地进行,实现一种随机的负载平衡,或者可以通过一些语义分区函数来实现。我们提供了语义分区的接口,允许用户指定一个分区的key,并使用这个key来做hash到一个分区(如果需要的话,也是可以复写这分区功能的)。例如,我们选择user的id作为可用,则所以该用户的信息都会发送到同样的分区。这反过来又会让消费者对他们的消费产生局部性的假设。这种明确设计的分区,允许消费者自己本地的处理。

异步发送(Asynchronus send)

批处理是效率的主要驱动因素之一,为了能够批处理,kafka的生产者会尝试在内存中积累数据,然后在一起在一个请求中以大批量的形式发送出去。批处理这个可以设置按固定的消息数量或按特定的延迟(64k或10ms)。这允许累积更多字节的发送出去,这样只是在服务器上做少量的大IO操作。这种缓冲是可配置的,这样提供了一种机制来以额外的延迟来提高吞吐量。
具体的配置)和生产者的api可以在这文档中找到。

消费者(The Consumer)

kafka消费者的工作方式是,向其想消费的分区的leader发送“fetch”请求。在每个请求中消费者指定日志的偏移量,然后接受回一大块从偏移量开始的日志。因此,消费者对position有重要的控制权,如果需要,可以重置position来重新消费数据。

Push和pull

我们首先考虑的一个问题是,消费者应该是从broker拉取消息,还是应该是broker把消息推送给消费者。在这方面,kafka遵循了一种更传统的设计,大多数消息传递系统也会用的,那就是数据是从生产者push到broker,消费者是从broker拉取数据。一些日志集中系统,如Scribe和Apache Flume,遵循一个非常不同的,基于推送的路径,将数据被推到下游。这两种方法都由利弊,在基于推送的系统,由于是broker得控制数据传输的速率,不同消费者可能要不同的速率。然而消费者一般的目的都是让消费者自己能够以最大的速度进行消费,但在基于push的系统,当消费速率低于生产效率时,消费者就不知道该怎么办好了(本质上就是一种拒绝服务攻击(DOS))。一个基于pull的系统就拥有很好的熟悉,消费者可以简单的调控速率。

基于pull的系统的另一个优点是,它可以对发送给消费者的数据进行聚合的批处理。基于推送的系统必须选择立即发送请求或积累更多数据,然后在不知道下游用户是否能够立即处理它的情况下发送它。如果对低延迟进行调优,这将导致仅在传输结束时发送一条消息,最终将被缓冲,这是浪费。基于pull的设计解决了这个问题,因为用户总是在日志的当前位置(或者是一些可配置的最大大小)之后提取所有可用的消息。因此,我们可以在不引入不必要的延迟的情况下获得最佳的批处理。

基于pull的系统的缺点是,如果broker没数据,则消费者可能会不停的轮训。为了避免这一点,我们在pull请求上提供了参数,允许消费者在“长轮训”中阻塞,直到数据达到(并且可以选择等待,直到一定数量的自己可以,确保传输的大小)。

你可能详细其他可能的设计,如只有pull,点到点。生产者会将本地的日志写到本地日志中,而broker则会从这些日志中拉取数据。通常还会提出类似的“存储转发(store-and-forward)”生产者。这很有趣,但是我们觉得不太适合我们的目标用例:它有成千上万的生产者。我们在大规模上运行持久数据系统的经验使我们觉得,在许多应用程序中涉及到数千个磁盘,实际上并不会使事情变得更可靠,而且操作起来也会是一场噩梦。在实践中,我们发现,我们可以在不需要生产者持久化的情况下,以大规模的SLAs来运行管道。

消费者的Position(Consumer Position)

令人惊讶的是,跟踪所使用的内容是消息传递系统的关键性能点之一。
很多消息传递系统在broker中保存了关于什么消息是被消费了的元数据。也就是说,当消息传递给消费者时,broker要么立即记录信息到本地,要么就是等待消费者的确认。这是一个相当直观的选择,而且对于一台机器服务器来说,很清楚地知道这些消息的状态。由于许多消息传递系统中用于存储的数据结构都很糟糕,因此这(记录消息状态)也是一个实用的选择——因为broker知道什么是已经被消费的,所以可以立即删除它,保持数据的大小。
让broker和消费者就已经消费的东西达成一致,这可不是小问题。如果一条消息发送到网络上,broker就把它置为已消费,但消费者可能处理这条消息失败了(或许是消费者挂了,也或许是请求超时等),这条消息就会丢失了。为了解决这个问题,很多消息传递系统增加了确认机制。当消息被发送时,是被标志为已发送,而不是已消费;这是broker等待消费者发来特定的确认信息,则将消息置为已消费。这个策略虽然解决了消息丢失的问题,但却带来了新的问题。第一,如果消费者在发送确认信息之前,在处理完消息之后,消费者挂了,则会导致此消息会被处理两次。第二个问题是关于性能,broker必须保存每个消息的不同状态(首先先锁住消息以致于不会让它发送第二次,其次标志位已消费从而可以删除它)。还有些棘手的问题要处理。如消息被发送出去,但其确认信息一直没返回。

kafka处理则不一样。我们的主题被分为一个有序分区的集合,且每个分区在任何给定的时间内只会被订阅它的消费者组中的一个消费者给使用。这意味着每个分区中的消费者的position仅仅是一个整数,这是下一次消费时,消息的偏移量。这使状态(记录是否被消费)非常小,每个分区只有一个数字。这个状态可以被定期检查。这样确认一条消息是否被消费的成本就很低。

这样还附加了一个好处。消费者可以重置其最先的position从而重新消费数据。这虽然违反了队列的公共契约,但它却变成关键功能给许多消费者。例如,如果消费者代码有一个bug,并且在一些消息被消费后才被发现,那么当bug被修复后,消费者就可以重新使用这些消息。

离线数据加载(Offline Data Load)

可扩展持久化允许只有周期性地使用批量数据的消费者的可能性,比如定期将批量数据加载到离线系统(如Hadoop或关系数据仓库)。

消息传递语义(Message Delivery Semantics)

现在我们已经了解了些生产者和消费者是怎么工作的,接下来我们说下kafka提供给生产者和消费者的语义保证。很明显这里提供了以下几种消息传递保证机制:

  • 至多一次(At most once),这样消息可能会丢失,但永远不会重新传递。
  • 至少一次(At least once),这样消息不可能会丢失,但可能会重新传递。
  • 有且仅有一次(Exactly once),这是大家想要的,每个消息会被传递一次,而且也仅仅只有一次。

值得注意的是,这可以归结为两个问题:发布消息的持久化保证,以及在消费消息时的保证。
很多系统声称提供“有且仅有一次”的传递语义,但阅读这些细节时,会发现其中大部分都是误导(他们不理解消费者或生产者可能挂掉的情况,那些有多个消费者处理的情况,或者是那些被写入磁盘的数据可能丢失的情况)。
kafka的语义很直接。在发布消息时,我们将消息“提交”到log中。一旦发布的消息被提交,只要有一个broker复制这个消息被写入活动分区,它就不会丢失。提交的消息的定义、活动分区以及我们试图处理的失败的类型的描述将在下一节(副本)中详细描述。现在我们假设在完美的情况下,现在让我们假设一个完美的、无损的broker,和尝试理解对生产者和消费者的保证。如果一个生产者试图发布消息并经历一个网络错误,那么就不能确定该错误发生在消息提交之前还是之后。这类似于插入到一个数据库表的自动生成的主键的语义。
在0.11.0.0版本之前,如果一个生产者没有收到一个消息已经提交的响应,那么它几乎没有选择,只能重新发送消息。这提供了“至少一次”的传递语义,因为如果原始请求实际上成功了,那么在重新发送期间,消息可能再次被写入到日志中。从0.11.0.0开始,Kafka生产者也支持一个幂传递的选项,该选项保证重新发送不会导致日志中有这重复的消息。为了实现这一目标,broker为每个生产者分配一个ID,并使用由生产者发送消息时一起把序列号发送到broker,这样broker就可以根据序列和id来处理重复的消息。同样,从0.11.0.0开始,生产者支持使用类似于事务的语义向多个主题分区发送消息:即所有消息都已成功写入或都失败写入。这种情况的主要应用场景是在Kafka主题之间进行“有且仅有一次”的处理(如下所述)。
并非所有的用例都需要这样强的保证。对于延迟敏感的使用,我们允许生产者指定它需要的持久化级别。如果生产者指定要等待消息被提交要在10ms完成。则生产者可以指定它异步地执行发送,或者等待直到leader(但不一定是follower)得到消息。
现在我们描述下消费者视角下的语义。所有的副本都有相同的日志和相同的偏移量。消费者控制它在这个日志中的position。如果消费者从未崩溃,它可以将这个position存储在内存中,但是如果消费者崩溃了,我们希望这个主题的分区来接替这个position的处理,那么新的进程将需要选择一个合适的position来开始处理。
消费者读取消息时,有几个处理消息和更新其位置的选项。

  1. 第二种是它先读取消息,然后将position保存到日志中,最后是处理消息。在这种情况下,在保存其position之后,在保存处理消息产生的输出之前,消费者进程可能会崩溃。在这种情况下,接手处理的过程将从保存的position开始,即使在此position之前的一些消息未被处理。这是对应着“至多一次”的语义,失败的消息可能不被处理。
  2. 第二种是它先读取消息,然后处理消息,最后保存position到日志中。在这种情况下,在处理消息后,消费者进程可能会崩溃,但是在它保存它的position之前崩溃的。在这种情况下,当新进程接手了它接收到的最初几条消息时,或许这几条消息就已经被处理过了。在消费者崩溃的情况下,这相当于“至少一次”的语义。在许多情况下,消息有主键,因此更新是幂等的(接收相同的消息两次,只是用另一个副本重写了一个记录)。

那“有且仅有一次”的语义怎样(或者是说你到底想要什么)?从kafka主题中获取消息处理后发布到其他主题(如一个Kafka Streams应用),我们可以利用上面提到的版本0.11.0.0里的新事务生产者的功能。消费者的position被当做一个消息存储在一个主题,因此我们可以在与接收处理数据的输出主题相同的事务中写入kafka的偏移量。 如果事务被中止,消费者的position将恢复到原来的值,而输出主题的生成数据将不会被其他消费者看到,这取决于他们的“隔离级别”。在默认的“read_uncommitted”隔离级别中,所有消息对消费者都是可见的,即使它们是被中止的事务的一部分,但是在“read_committed”中,使用者只会从提交的事务中返回消息(以及任何不属于事务的消息)。

当写入外部系统时,限制是在需要协调消费者的position和实际存储的输出。实现这一目标的经典方法是在存储消费者position和存储消费者输出之间引入两阶段提交。但这可以更简单地处理,并且通常通过让消费者将其偏移量存储在与输出相同的位置。这样做比较好,因为消费者可能想要写入的输出系统都不支持两阶段提交。作为一个例子,考虑一个Kafka Connect连接器,它在HDFS中填充数据,以及它读取的数据的偏移量,从而保证数据和偏移量都得到了更新,或者两者都不更新。对于需要这些更强语义的其他许多数据系统,我们遵循类似的模式是为了那些需要强一致性语义的系统,还为了这些消息没有主键来允许删除重复数据。

因此kafka为了kafka Streams,高效地支持“有且仅有一次”的传递,并且在Kafka主题之间传输和处理数据时,通常可以使用事务生产者/消费者提供“有且仅有一次”的传递。对于其他目标系统的“有且仅有一次”的传递一般需要协调,但kafka提供了偏移量,它可以实现这要求(参见Kafka Connect)。否则,缺省情况下Kafka保证“至少一次”传递,并且允许用户禁止生产者的重试或消费者在处理数据之前提交position,从而实现“至多一次”的专递。

副本(Replication)

Kafka通过一个可配置的服务器数量对每个主题的分区进行复制日志(你您可以按主题设置此副本因子(replication factor))。这允许在集群中的服务器发生故障时自动恢复,因此当在出现故障时仍然可以使用消息。
其他消息传递系统提供了副本相关的特性,但,我们认为,这似乎是一种策略而已,并没有大量的使用,而且还有个很大的缺点:slave是未被用上的,吞吐量受到严重的影响,恢复还需要繁琐的人工配置,等等。kafka默认是使用了副本功能,实际上那些副本因子设置为1的主题,我们也会当做是使用副本功能的主题。
副本的最小单元是主题的分区。在没有失败的情况下,kafka的每个分区都是有一个leader,其follower可以为零个或多个。包括leader在内的副本数量就是副本因子。所有读和写都是通过leader分区。通常情况,分区的数据量是多个broker,leader的数量时平均分配当每个broker。follower的日志和leader的日志是完全相同的——它们都具有相同的偏移量和相同顺序的消息(当然,在任何给定的时刻,在日志的末尾可能会有一些还未同步到的消息)。
follower也跟kafka的普通消费者一样从leader消费消息。follower从leader拉消息时,有个很好的特性,那就时可以让follower很容易地批量把日志应用到其(follower)日志中。
跟很多 分布式系统处理自动恢复 一样,对于节点是否“存活(alive)”需要有一个明确的定义。对于kafka,节点存活有以下两个条件:
1.节点必须维护它与ZooKeeper的session(通过ZooKeeper的心跳机制)
2.如果是slave,就必须复制leader,而且不能落后太远。
满足上述两个条件的节点,我们更愿意叫“已同步(in sync)”而不是模糊不清的“存活”或“失败”。leader保持跟踪这些“已同步”的节点。如果follower挂了,或者卡住了,或者落后太远了,leader会讲起从已同步的副本名单中移除。是有e replica.lag.time.max.ms这配置去控制卡住多长时间和落后多少副本数量。
在分布式系统术语中,我们只尝试处理一个“失败/恢复”模型,即节点突然停止工作,然后恢复(可能不知道它们已经死亡)。kafka没有处理所谓的“拜占庭式”的失败,即节点产生任意或恶意的响应(可能是由于某些错误)。

现在,我们可以更精确地定义一个消息的提交,当所有副本都同步到分区,分区并且应用到其日志中时,就会被认为是提交的。只有提交的消息才会分发给消费者。这就意味着消费者不用担心当leader崩溃时,消息会丢失。另一方面,生产者可以选择等待消息提交或不提交,这取决与它们对延迟和持久化之间的权衡。生产者可以使用acks这配置来控制这权衡。注意,这“最小数据量(minimun number)”同步副本的数量设置,是指当消息都同步到所有副本后,kafka再去检查时,检查的最小数量。如果生产者对确认要求不太严格,则消息一发布就可以被使用了,即使同步副本数量还没达到最小值。(这最小值可以低到只有一个,那就是leader)。

kafka保证消息不会丢,只要任何时候至少有一个已同步的副本存在。
kafka可以在节点故障的情况下可用。但存在网络分区时,就可能无法使用了。

副本日志:法定人数,ISR和状态机 (Replicated Logs: Quorums, ISRs, and State Machines (Oh my!))

分区就是一个副本日志。副本日志是分布式数据系统的最基本的原语(primitvie)之一,而且有很多种实现方式。其他系统可以使用副本日志作为一种原语,用于在状态机形式的分布式系统。
对于一系列值的顺序达成一致的过程(通常编号为0、1、2、…),副本日志就是将其模型化。有很多方法可以实现这一点,但最简单和最快的是leader来选择序值。只有leader还存活,所哟follower都只需要复制值即可,顺序由leader决定。
当然,如果leader不挂,那我们没必要要follower。当leader崩溃时,我从follower中选择出新的leader。但follower自己可能落后或崩溃,所以我们必须保证我们选择的是最新的follower。日志复制算法必须这最基本的保证时,如果我们告诉客户端消息已经提交了,而此时leader挂了,我们选择的新leader也必须包含刚刚那个已经提交了的消息。这就产生了一个权衡:如果leader等待过多的follower确认消息,This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.

如果你指定确认的数量和日志(与leader对比过的)的数量,这样就保证有重叠性,那么这就叫法定人数(Quorums)。

这种权衡最常见的方法是,在提交决策和leader选举中使用大多数投票。这不是kafka做的,但让我们去探索它,了解它的利弊。假设我们有2f+1个副本。如果f+1节点收到消息,没有超过f个节点失败,则leader就保证所有消息都被提交,我们选择新leader时也一样。这是因为我们在任意节点上选择f+1个节点,这f+1里必须至少有一个节点包含所有已提交消息的副本。副本最完整的结点将会被选中为新leader。这里还有很多算法细节需要处理(如明确定义日志的完整性,leader崩溃时怎么保证一致性,修改集群中的服务器),这些我们先暂时忽略。
多数投票方法有个非常好的特性:延迟仅仅取决于多台最快的服务器。也就是说,如果副本因子时3,那么延迟由最快的一个slave决定,而不是最慢的slave(leader一个、最快的slave一个,这就达到法定人数了)。
这个家族有很多算法,包括ZooKeeper的Zab,Raft和Viewstamped副本算法。我们知道的,更接近kafka的用的算法的学术出版是来自微软的PacificA。
多数投票的不足之处就是,它不需要很多失败的节点,就可以让你选择不到leader。为了容忍一个节点失败,则需要3个节点,容忍2个,则需要5个节点。在我们经验里,以为只要刚刚好够冗余的副本,就能容忍一个节点的失败,但这是不实际的,在5倍硬盘空间(5个硬盘,每个硬盘占1/5吞吐)情况下,每次都要写5次,这对于大量数据的问题时不切实际的。这也是为什么法定人数算法比较常用在集群的配置文件如ZooKeeper,而很少用在原数据存储上。例如在HDFS的namenode的高可用是建立在多数人投票,但这成本很高的算法不会用在它的数据存储上。
Kafka使用了一个稍微不太一样的方法去选择法定人数。kafka动态的维护一个ISR(in-sync replicas)集合,集合里面的节点都是已同步。只有这集合里面的人才适合选举为leader。只有所有ISR都收到写入分区,则这分区的写入就会被认为已提交。这ISR保存在ZooKeeper。对于kafka的使用模型来说,这是一个重要的因素,那里有许多分区,并且确保leader的平衡很重要。ISR模型和f+1副本,一个kafka主题可以容忍f个失败(总共就f+1个节点)。
我们想处理更多的用例,所以这个权衡我们觉得是合理的。在实际情况,对于容忍f个节点失败,多数投票和ISR方法都是需要通用数量的副本确认(比如,容忍1个节点失败,多数投票方法则需要3个副本和1个确认,ISR方法需要2个副本和1个确认)。确认提交而不需要由最慢的节点来确认这是多数投票方法的好处。但我们觉得这是可以通过由客户端选择是否阻塞消息提交,以及控制副本因子(降低)而增加吞吐量和磁盘空间来优化这个问题(这问题就是与多数投票对比)。
另一个重要的设计是,kafka不要求崩溃节点在所有数据完整的情况下恢复。在这个空间中,副本算法依赖于“稳定存储”的存在并不少见,这种“稳定存储”在任何故障恢复场景中都不能丢失,要保证一致性。这有两个主要问题。首先,硬盘故障是我们在持久化数据系统的实际操作中最常见的问题,问题发生后,通常也不会完整地保留数据。其次,即使这不是一个问题,我们也不希望在每次写入时都需要使用fsync,因为这样会减少两到三个数量级的性能。我们允许一个副本重新加入ISR的协议,这协议确保在重新加入之前,它必须完全重新同步,即使它在崩溃中丢失了未刷新的数据。

 不清晰的Leader选举:如果它们都挂了呢?(Unclean leader election: What if they all die?)    

注意,Kafka对数据不丢失的保证是基于至少一个保持同步的副本。如果一个分区的副本都丢失了,则无法保证数据不丢失。
然而在实际情况下的系统当所有副本挂之后必须做一些合理的事情。如果很不辛遇到这种情况,意识到后面会发生什么这是很重要。可能会出现以下两种情况:
1.等待ISR里的所有节点恢复,并选择出新的leader(希望这leader还保存着所有的数据)。
2.选择第一个副本(不需要是ISR里面的)恢复,作为leader。

以下是可用性和持久化的权衡。一、如果我们等待所有ISR副本恢复,则我们会等很长的时间。。二、如果副本的数据都丢了,则永远无法恢复。最后一个就是,如果一个没有同步的副本恢复,我们允许它为leader,则认为它的日志是最新的,哪怕它没有包含所有已提交的消息。在0.11.0.0版本里默认的选择第一个权衡,用等待来换取数据的一致性。这个是可以配置的,如果启动时间比一致性重要,则修改这个 unclean.leader.election.enable。
这个困惑不仅仅kafka有。它存在与任何基于法定人数算法的场景。例如,在多数投票的场景,如果你是去大多数服务器,在剩余的服务器,你就必须在两者选其中一个,不是失去100%的数据就是丢失数据的一致性。

可用性和持久化的保证(Availability and Durability Guarantees)

生产者生成消息时,可以选择0个,1个或者全部副本确认。注意这里的“全部副本确认”不能保证所有被分配副本的结点都能收到消息。默认的,当acks=all时,只要所有当前所有ISR都收到消息,则可以确认消息。例如,一个主题被设置为两个副本和一个失败(只有剩下一个ISR),然后所有acks=all的写入都会是成功的。如果剩余的副本也失败,这样消息就会被丢失。尽管这确保了分区的最大可用性,但是这种行为可能不适合某些喜欢持久化而不是可用性的用户。因此,我们提供了两种顶级的配置,可用于更倾向于消息持久化而不是可用性:
1.关闭不清晰的leader选举——如果所有副本变得不可用,直到最近的leader变得可用,所有分区才可以变得可以用。这有效地避免了消息丢失的风险。请参阅上一节不清晰的Leader选举。
2.指定最小的ISR数量——只有高过这最小数量,消息才会被确认,这是为了避免在写入一个副本时,而且副本挂了,导致消息丢失的风险。这个设置仅仅在生产者使用acks=all生效或保证消息在这数量以上的ISR确认。这个设置提供了一致性和高可用的权衡。ISR最小数量设置高一点,这样更好的保证一致性。然而这样会减少可用性,因为在ISR没满足这数量时,分区是不可用的。

副本管理(Replica Management)

上诉讨论副本,也仅仅是一份日志,也就是主题的一个分区。然而kafka是管理成千上万的分区。我们试图以循环(round-robin)方式在集群中平衡分区,以避免在大数据量的主题的所有分区都在少量节点上。同样地,我们试图平衡leader,使每个节点都是其一定份额分区的leader。
对ledaer选举过程进行优化也很重要,因为这是服务不可用的窗口期。一个简单的leader选举会在一个节点失败后,在该节点内所有分区,每个分区都会举行一次选举。相反,我们选择一个broker作为“controller”。这controller检测broker层次的失败,负责修改受故障影响的分区的leader。其结果是,我们能够将许多需要的leadr变更批量处理,这使得选举过程在大量的分区上变得更加便宜和快速。如果controller失败了,其中一个存活的节点会变成新的controller。

日志压缩(Log Compaction)

日志压缩保证kafka在每个分区,对于每个key,至少保存其最近的一条消息。这解决了那些需要当应用或系统崩溃后,重启时需重新加载数据的场景。
到目前位置,我们只讨论了简单的数据保存方法,那就是当旧日志数据超过一定时间或达到一定大小的时候会被删除。这个适用于每条相对独立的消息,如临时事件。然而,还有一类很重要的数据,那就是根据key修改数据,一种可变的数据(例如在数据库表数据的变更那样)。
我们讨论一个具体的例子。一个主题包含了用户emial信息,每次用户更新他们的email信息,我们都会发送消息到topic,是根据他们的userid做主键。以下是我们发送的消息,userid是123,每条信息都对应着一次的email信息修改(省略号是省略其他userid的消息)。


123 => bill@microsoft.com
        .
        .
        .
123 => bill@gatesfoundation.org
        .
        .
        .
123 => bill@gmail.com

日志压缩给了我们更细颗粒度保留数据机制,这样我们就可以保证只保留每一个key最后的一次变更(如123 => bill@gmail.com)。这样我们保证了日志里都包含了所有key的最后一个值的快照。这就意味着下游的消费者可以重建状态而不需要保存所有的更变日志。
让我们一些日志压缩有用的场景,然后我们在看看是怎么被使用上。
1.数据库变更订阅(Database chagne subscription)。我们很常见到一份数据集会存在多种数据系统里,而且这系统里有一个类似数据库那样的(如RDBMS或新潮的key-value系统)。举个例子,你有一个数据库、一个缓存、一个搜索集群和一个Hadoop集群。这样每次数据库的修改,都得映射到那缓存、那搜索集群和最后在Hadoop里。在这个场景里,你只是需要实时最新更新的日志。但如果需要重新加载进缓存或恢复宕机的搜索节点,就可能需要完整的数据集。
2.事件源(Event sourcing)。这是一种应用设计风格,它将查询和应用设计结合在一起,并使用日志作为程序的主要存储。
3.高可用日志(Journaling for high-availability)。一个本地计算的进程可以通过变更日志来做到容错,这样另一个进程就能重新加载这些变更继续处理。一个具体的例子就是流式查询系统,如计数、汇总和其他“分组”操作。实时流式处理框架Samza就是使用这功能达到目的的。
在上述场景中,主要处理实时的变更,偶尔需要重新加载或重新处理时,能做的就只有重新加载所有数据。日志压缩提供了这两个功能,处理实时数据变更,和重新加载数据。这种使用日志的风格,详情可参看点击

这思路很简单。如果我们保存无穷无尽的日志,保存上述场景中每个变更日志,而且还是一开始就获取每个系统的状态。使用这个完整的日志,我们就可以恢复到任何一个时间点的状态。但这种完整日志的假设时不切实际的,因为对于那些每一行记录都在变更多次的系统,即使数据很小,日志也会无限的增长下去。那我们就简单的丢弃旧日志,虽然可以限制空间的增长,但也无法重建状态——因为旧日志被丢弃,可能一部分记录的状态无法重建。
相对于粗粒度的基于时间的数据保留策略,日志压缩的策略是一种更细颗粒度,基于每一条记录保存。这个想法是,有选择性的删除那些有多个变更记录的同样的key。这样的日志就保证每个key都至少有一个最新的状态。
数据保留策略可以为每个主题设置,所以一个集群里有些主题的保存策略可以设置为大小和时间来保存数据,有主题也可以通过压缩保留。
这个功能的灵感是来自于LinkedIn里最古老且最成功的基础架构——一个被称为Databus的数据库变更日志缓存系统。
跟大多数日志结构存储系统不一样的时,Kafka是为了订阅而设计的,组织数据的形式也是为了更快的线性读取和写入。跟Databus不一样之处是,kafka作为真实源(source-of-truth)存储,即使上游数据源不具备可重用性的情况下,它还是挺有用的。

不管是传统的RDBMS还是分布式的NoSQL存储在数据库中的数据总是会更新的,相同key的新记录更新数据的方式简单来说有两种:
1.直接更新(找到数据库中的已有位置以最新的值替换旧的值)。
2.追加记录(保留旧的值,查询时再合并,或者也有一个后台线程会定期合并)。
采用追加记录的做法可以在节点崩溃时用于恢复数据,还有一个好处是写性能很高,因为是线性写。
以下是各个数据系统的更新数据方式:

数据系统更新数据追加到哪里数据文件是否需要压缩
ZooKeeperlogsnapshot不要,因为数据量不大
Redisaofrdb不需要,因为是内存数据库
Cassandracommit logdata.db需要,数据存在本地文件
HBasecommit logHFile需要,数据存在HDFS
Kafkacommit logcommit log需要,数据存在分区中的Segment里

日志压缩基础(Log Compaction Basics)

这里有个更高层次的图,展示kafka日志的逻辑存储结构,框框的每个数字都是一条消息的偏移量(offset):

日志的头部(Log Head)就是传统的kafka日志。日志的尾部(Log Tail)则是被压缩过的日志。Log Head是很密集的,偏移量时连续的,保留了所有的消息。值得注意的是在Log Tail的消息虽然被压缩,但依然保留它一开始被写入时的偏移量,这个偏移量是永远不会被改变。而且这压缩日志里的偏移量,在日志里依然时有效的。所以,时无法区分下一个更高的偏移量是什么,比如说,上面的例子,36、 37、 38都是属于同一个位置。
以上说的都是数据更新时的日志压缩,当然日志压缩也支持删除。当发送某个Key的最新版本的消息的内容为null,这个Key将被删除(某种程度上也算是更新,如上面的例子就是把email信息置为null)。这个消息也称删除标志(delete marker),这个删除标志会把之前跟这key相同的消息删掉。但这删除标志比较特殊,特殊之处是它是过一段时间才被删除,从而腾出磁盘空间。而数据删除的时间点会被标志为“删除保留点(delte retention point)”,也就是如上图所示,这个图展示也很特别,你看看两个是point而不是pointer,也不是指向某个消息,而是消息与消息之间。说明它是个时间点,而不是指向某个消息的指针pointer。

压缩时通过后台定期复制日志段(log segment)完成的。清除时并不会阻塞读操作,而且还可以配置不超过一定的IO,从而避免影响消费者和生产者。压缩日志段的过程如下:

日志压缩提供了什么保证?(What guarantees does log compaction provide?)

日志压缩保证:
1.任何消费者只要是读取日志的头部的,都可以看到所有消息,头部的消息不会被删除。这些消息都是有连续的偏移量。Topic的min.compaction.lag.ms参数可用于保证在指定时间内该消息的存在,而不会被压缩。这提供了消息呆在头部(未被压缩)的时间的底线。
2.依然保持则消息的有序性。压缩永远不会重新给消息排序,而仅仅是删除其部分而已。
3.消息的偏移量永远不会改变。它永远标志着消息所在的位置。
4.任何从日志最开始的地方开始处理都会至少看到每个key的最终状态。另外,只要消费者在delete.retention.ms(默认是24小时)这时间内达到日志的头部,则将会看到所有删除记录的删除标志。也就是说:由于删除标志的移除和读取是同时发生,所以如果错过delete.retention.ms这时间,消费者会错过删除标志。

 日志压缩细节(Log Compaction Details)

日志压缩通过日志清除器(log cleaner)执行,后台线程池复制日志段,移除那些存在于Log Head中的记录。每个压缩线程工作如下:
1.选择Log Head中相对比Log Tail的比例高的日志。
2.创建Log Head中每个Key对应的最后一个偏移量的日志摘要。
3.从头到尾的开始复制,在复制过程中删除相同key的日志。新的、干净的日志段将立刻被交换(swap)到日志里,所以只需一个额外的日志段大小的硬盘空间就可以(不需要全部日志的空间)。
4.Log Head的日志摘要实际上是一个空间紧凑的哈希表。每个实体只需要24个字节空间。所以8G的cleaner空间,可以处理大概366G的Log Head(假设每个消息大小为1k)。

 日志清除器的配置(Configuring The Log Cleaner)

Kafka是默认启用日志清除器,是个线程池。如果要开启指定主题的清理功能,你可以在日志里添加以下属性:

log.cleanup.policy=compact

这个可以在创建主题时指定或修改主题时指定。

日志清除器可以设置多少消息在Log Head而不被删除。这个启用是通过设置压缩时间段:

log.cleaner.min.compaction.lag.ms

如果不设置,则默认是除了最后一个segment之外,其余日志段都会被压缩,即最后一个日志段不会被压缩。任何已激活的日志段都不会被压缩,就算消息的时间已经超过了上面配置的时间,这里的激活,是指有在消费。

配额(Quotas)   

Kafka集群有能力强制性地要求控制broker中客户端使用的资源。以下是两类客户的quotas:
1.网络带宽quotas,具体到字节(从0.9版本开始)。
2.请求速率quotas,具体到CPU的利用率(网络和IO的比值)。

为什么配额是必须的?(Why are quotas necessary?)

生产者和消费者有可能生成/消费大量的数据或请求速率非常高,以致于占满了broker的资源,导致网络饱和broker拒绝给其他客户端服务。使用quotas就能避免这个问题,在多租户集群上尤为重要,因为一部分低质量的客户可能会降低高质量客户的用户体验。实际上,可以对API进行这样的限制。

客户组(Client groups)

Kafka客户标识是用户主体(user principal),用于代表用户在这安全的集群上的权限。在无鉴权的时候,broker通过可配置的PrincipalBuilder来提供用户主体,用来分组。由客户端应用选择client-id作为客户的逻辑分组。元组(user,client-id)则定义了一个安全逻辑组,共享user principal和chient-id。
quotas可以被应用到元组(user,client-id),user或client-id组。对于一个连接,匹配上的quota将会应用到此连接上。例如(user=”test-user”,client-id=”test-client”)拥有生产者quota是10MB/s,这个10MB的带宽将会被user是“test-user”并且client-id是”test-client”的生产者进行共用。

配额的配置(Quota Configuration)

quota可以按(user,client-id)配置,也可以按user组配置,也可以按client-id组配置。默认quota可以被任何级别的quota给覆盖。这个机制类似于每个Topic可以覆盖自己的。ZooKeeper的/config/users的quota可以覆盖user和(user,client-id)的quota。/config/clients下的则可以覆盖client-id的quota。这些ZooKeeper的覆盖会即可在所以broker中生效,这样我们就不需要修改配置时重启服务器。详情请点击
quota配置的优先级如下:

1./config/users/<user>/clients/<client-id>  
2./config/users/<user>/clients/<default>  
3./config/users/<user>  
4./config/users/<default>/clients/<client-id>  
5./config/users/<default>/clients/<default>  
6./config/users/<default>  
7./config/clients/<client-id>  
8./config/clients/<default>  

broker的(quota.producer.default, quota.consumer.default)属性来给每个client-id组设置默认的网络带宽。但后面的版本会删除这些属性。
client-id组的默认quota可以在ZooKeeper中配置。

网络带宽配额(Network Bandwidth Quotas)

网络带宽quota,具体到字节,而且是有组里的客户一起共享。默认的,每个独立的客户组都有一个固定的网络带宽的quota。这quota配置在每个broker。

请求速率配额(Request Rate Quotas)

请求速率quota,具体到时间的百分比,时间是在quota窗口里每个broker的处理请求的IO线程和网络线程。 n%的quota代表一个线程的n%,所以quota总数是((num.io.threads+num.network.threads)×100)%。每个客户组在一个quota窗口中最多使用n%的IO线程和网络线程。由于分配给IO和网络的线程数是根据broker主机的cpu个数,则每个请求速率quota代表着CPU的百分比。

实施(Enforcement)

默认情况下,每个唯一的客户组都会有一个集群配置好的固定的quota。这个quota是定义在每个broker上。我们决定由每个broer定义这些quota,而不是由集群为每个client统一设置一个quota的原因,是因为为了方便共享quota的设置。
如果Broker检测到超过quota了,会怎么处理?在我们的解决方案中,我们是选择降低速率,而不是直接返回错误。broker会去计算处理这问题的延迟时间,这段时间则不会立刻响应客户端。这种超过quota的处理,对于客户端来说是透明的。客户端不需要做额外的操作。实际上,客户端额外的动作,如果操作不好,还会加剧超过quota的问题。
字节率和线程利用率都会在多个小窗口中监测(一秒钟有30个窗口),以便快速准确的纠正quota违规行为。
客户端字节率在多个小窗口(例如每个1秒的30个窗口)上进行测量,以便快速检测和纠正配额违规。 通常,大的测量窗口(例如,每30秒10个窗口)会导致大量的流量,然后是长时间的延迟,这对用户体验方面并不好。

参考和翻译:
RabbitMQ官方 https://www.rabbitmq.com
Kafka官方 http://kafka.apache.org/docum…

    原文作者:设计模式
    原文地址: https://segmentfault.com/a/1190000014667476
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞