6.1.6 日志压缩

不管是传统的RDBMS还是分布式的NoSQL,存储在数据库中的数据总会更新。更新数据有两种方式:直接更新(找到数据库中的已有位置,以最新的值替换旧的值)、以追加方式更新(保留旧值,查询时再合并;或者会有一个后台线程,对相同键的所有记录进行定期合并操作)。第二种做法因为在写操作时不需要查询,所以写性能会很高。如表6-3所示,很多分布式存储系统都采用这种追加方式。这种方式的缺点是:需要通过后台的压缩操作保证相同键的多条记录,经过合并后只保留最新的一条记录。
《6.1.6 日志压缩》

如图6-35所示,Kafka的消息由键值组成,在日志压缩时,如果相同的键出现了多次,则只会保留最新的那条消息。比如,键为Kl出现了3次,偏移量分别是[日,2,3],最后只会保留偏移盘等于3的记录;键为KZ出现了3次,偏移量分别是[1,5,9],最后只会保留偏移量等于9的记录。

《6.1.6 日志压缩》

下面先介绍日志压缩的两个重要概念一一清理点和删除点,然后分析日志压缩的具体实现。

  1. 清理点(日志头部和尾部)
    基于时间和大小策略的“日志清理”是一种粗粒度的日志保留策略,“日志压缩”则是一种基于每条记录的细粒度日志保留策略。前者的做法是:要么保留一个日志分段,要么删除整个日志分段。后者的做法是:如果相同键有新的记录,则有选择地删除旧的记录,保证每个键至少都保存有最近的一条记录。

不同的系统有不同的清理方式,比如NM的垃圾回收算法将存活的对象复制、整理到指定区域;HBase或Cassandra的压缩策略会将多个数据文件合并、整理成新的数据文件。日志压缩的工作是:删除旧的更新操作,只保留最近的一次更新操作。为了执行日志压缩,我们需要解决下面两个问题:

  • 如何选择参与合并的文件;
  • 选择到文件后,如何压缩。

问题一:除了当前活动的日志分段(actlveSegl’lent),Kafka的日志压缩会选择其他所有的日志分段参与合并操作。之所以要排除活动的日志分段,是为了不影响写操作,因为追加消息总是追加到活动的日志分段的末尾。

注意:其他NoSQL分布式存储系统选择参与合并文件的算法比较复杂,因为它们把所有的数据文件都放在以表级别为粒度的同一个文件目录下。合并文件时,显然不能把表中所有数据文件都一起合并。而Kafka的每个文件目录都是一个分区,不同分区有不同的文件目录,每个分区目录下的日志分段都不会太多。即使一次合并一个分区下的所有日志分段,也不会有太大的问题。

问题二:日志压缩会将所有旧日志分段的消息,复制到新的日志分段上。为了降低复制过程产生的内存开销,Kafka在开始日志压缩操作之前,会将日志按照“清理点”(口eanerPoint)分成日志尾部和头部。如图6-36所示,方框中的数字表示偏移量,下面列举了3次日志压缩的步骤。

(1)第一次日志压缩,清理点就等于0。日志头部的范围从0到活动日志分段的基准偏移量13。
(2)第一次压缩后,清理点更新为13。第二次日志压缩时,日志头部范围从13到活动日志分段的基准偏移量20。日志尾部范围从0到清理点的位置13。
(3)第二次压缩后,清理点更新为20。第三次日志压缩时,日志头部范围从20到活动日志分段的基准偏移韭28。日志尾部范围从2到清理点的位置20。

上面几个步骤中,在开始日志压缩之前,日志头部和传统的Kafka日志类似,它们的偏移量都是顺序递增的,并且保存了所有的消息。日志尾部的偏移量是稀疏的,虽然整体上有序,但不是逐一递增的。比如第二次压缩之前,日志尾部的偏移量是[0,2,5,6,8,9,1El];第三次压缩之前,日志尾部的偏移量是[2,5,9,14,17]。

《6.1.6 日志压缩》

注意:图中每条消息所在的键值、日志分段并没有画出来(除了活动日志分段的基准偏移量,可以比较明显地看出来),这些消息存储在不同的日志分段里。日」志压缩面向整个分区,淡化了日志分段的边界。不过,具体在执行压缩动作时,还是会面向日志分段,因为复制消息时必须要读取原来的日志分段。

除了新引人的“清理点”概念,Kafka的日志压缩操作中与偏移盐、文件位置相关的特点有以下几点。

  • 日志压缩前后,日志分段中每条消息的偏移量和写入时总是保持一致。被保留的消息即使复制到新的日志分段,也不会改变消息的偏移量。即:消息总是有序的,日志压缩不会对消息重新排序。
  • 日志压缩后,消息的物理位置会发生变化。因为生成了新的日志分段,日志分段中每条消息的物理位置会重新按照新文件来组织。
  • 日志压缩后,日志分段的消息偏移量不再是连续的,但并不影响日志的查询。以图6-36为例,假设要读取偏移量分别是[15,16,17]的消息,它们都会从偏移量等于17的文件物理位置开始读取。

在清理日志的同时,客户端也会读取日志。因为日志头部的消息偏移盘是逐一递增的,而日志尾部的消息偏移量是断续的。如果客户端总是能够赶上日志的头部,它就能读到日志的所有消息;反之,就可能不会读到全部的消息。如医16-37所示,不同的日志分段用虚线分隔,下面举例了客户端读取日志分段的步骤。

(1)第一次日志压缩会合并偏移量从0到12的范围,客户端读取第一个日志分段读到偏移量;3。
(2)第一次日志压缩后,偏移:fil:从0到12做了合并。客户端继续从偏移量4开始读取,但是日志分段中没有偏移il:l:4的消息,客户端就只会从偏移盐5开始读取。这种场景下,客户端没有赶上日志的头部,它就可能不会读到全部的消息。比如,从偏移盘5到偏移量13中间被删除的消息,就不会被客户端读取到。
(3)假设客户端在每次日志压缩完成之前,都已经读取到了日志的头部,它就可以读取到所有的消息。比如,在第二次日志压缩后,客户端已经读取到了偏移盘20。那么第二次日志压缩即使完成了,对客户端也没有影响,因为客户端已经在日志头部之后了,偏移盘20之前的消息都已经被客户端读取过了。

《6.1.6 日志压缩》

“客户端有没有赶上日志头部”的依据是:在日志压缩后,客户端有没有读取到日志头部的起始偏移量。这个日志的头部指的是日志压缩后的日志头部,而不是日志压缩前的日志头部。因为每次日志压缩,日志头部的起始位置都会不同。下面对比了日志压缩前后,使用不同日志头部偏移聋的区别。

  • 以日志压缩前的日志头部偏移量作为依据。第一次日志压缩前的日志头部偏移量等于0。日志压缩后,客户端虽然读取过了偏移量0,但仍然不会读取到所有的消息。
  • 以日志压缩后的日志头部偏移量作为依据。第一次日志压缩后的日志头部偏移量等于13。日志压缩后,客户端已经读取过偏移量13,说明已经读取了偏移革0到偏移量13之间的每条消息,这就意味着读取到了所有的消息。

日志压缩后,客户端即使没有读取到每条消息的所有记录,但它总能够读取到每条消息最近的那条记录。对于只关心消息最新状态的客户端而言,它们的消费速度可以放慢点,也不会有影响。对于关心每条消息所有状态的客户端而言,它们要一直保持与主副本的同步;否则一旦发生日志压缩,消息的旧状态被删除后,就对客户端不可见了。

  1. 删除点(墓碑标记)
    日志压缩还要考虑删除消息的场景。如果一条带有键的消息,它的值内容为null,表示这条“删除的消息”所在偏移量之前的所有消息都需要删除。当然,“删除的消息”本身也会被删除。在其他分布式存储系统中,“删除的消息”也叫作“墓碑标记”(tombstone)。

分布式存储系统中的每条消息都有多个副本,而消息的复制可能存在延迟或者失败。为了保证墓碑标记之前的所有消息都删除掉,墓碑标记除了追加到主副本的日志分段上,也需要复制并保存到其他节点的备份副本上。墓碑标记会在日志分段中存储一段时间,最后在指定的超时时间过后会被删除掉。

如图6-38所示,日志压缩会将上一次压缩后的多个小文件合并为一组,压缩成新的文件。图中方框内数字表示日志分段的修改时间,方框上数字表示日志分段的文件大小。灰色背最的区域表示一次完整的日志压缩过程,包括日志压缩前选择日志分段、压缩时复制消息、压缩后生成新文件。日志压缩后,新的日志分段不会更改每条消息的偏移量,也不会更改文件的最近修改时间,具体步骤如下。

(l)第一次日志压缩,清理点等于0,没有日志尾部,日志头部从6:00到7:40。所有日志分段文件都是lGB,不考虑删除的消息。
(2)第一次日志压缩后,清理点改为日志头部末尾即7:40。每个新日志分段的大小都小于lGB。
(3)第二次日志压缩时,清理点为7:40,日志头部从8:00到8:10,日志尾部从6:00到7:40。压缩操作会将多个小文件分成一组,每一组不超过lGB。比如,[6:00,6:20,6:30]这3个主件合成一组,[6:35,7:00]这2个文件合成一组,7:40这l个文件单独一组。

《6.1.6 日志压缩》

在第二次之后的日志压缩,都要考虑“删除的消息”(墓碑标记)是否需要保留。日志分段保留墓碑标记的条件是:日志分段的最近修改时间大于deleteHor1.zonMs。deleteHor”i.zonMs的计算方式:从0到日志头部起始位置前的最后一个日志分段,它的最近修改时间减去保留阔值(delete.retention.111s配置项,默认为24小时,这里假设为l小时),下面举了两个示例。第二次日志ffi缩时,日志头部(从6:00到7:40)前的最后一个日志分段修改时间等于7:40,减去l小时等于6:40。下面开始判断日志头部的每个日志分段是否需要保留墓碑标记。

(l)第一组:删除墓碑标记的分段[6:00,6:2日,6:30]。
(2)第二组:删除墓碑标记的分段[6:35],保留墓碑标记的分段[7:00]。
(3)第式组:保留墓碑标记的分段[7:40]。

第三次日志压缩时,日志头部(从6:30到8:10)前的最后一个日志分段修改时间等于8:10,减去l小时等于7:10。下面开始判断日志头部的每个日志分段是否需要保留墓碑标记。

(1)第一组:删除墓碑标记的分段[6:3日,7:00],保留墓碑标记的分段[7:40,8:00]。
(2)第二组:保留墓碑标记的分段[8:1日]。

注意:只有日志尾部才需要判断是否需要保留墓碑标记,日志头部一定会保留墓碑标记(因为日志头部之后的每个日志分段,它们的修改时间一定大于deleteHor1.zonMs)。日志泊理器会处理日志的压缩操作,结合前面的背景知识,压缩操作的具体步骤如下。

(1)选择日志头部到日志尾部比率最大的日志进行日志压缩。
(2)对日志头部构建一个消息的键到最近偏移量的映射关系。
(3)重新复制每条消息到新文件中。如果消息的键有更高的偏移量,则不会复制这条消息。
(4)新的日志分段完成复制后,会替换掉旧的日志分段。

其中,步骤。)判断消息是杏需要复制到新文件中,它依赖于步骤。)构建的映射表。假设映射表的示例数据是{Kl->100,K2->110},如果日志分段(不管是日志尾部还是头部,日志头音fl中也可能有相同键不同偏移茧的消息)巾消息、健为Kl的消息偏移量还有[15,80,99],它们都不会被复制到新文件中,最终键为阳的消息只会保留偏移盘100的那条。

注意:映射表的数据结构是一个空间紧凑的散列表,每个条目只占用24Byte。假设映射表的缓冲区空间为8GB,它可以存储的消息数量有8GBI24Byte=357,913,941,如果一条消息有lKB,它能缓存的日志头部大小接近356GB。也就是说,对大小为356GB的日志头部构建映射表,只需要占用8GB的内存。缓冲区的配直项为log.cleaner.dedupe.buffer.size,默认值为128MB,可以缓存的日志头部大小为5.6GB。

日志的压缩由日志清理器处理,下面来分析日志压缩的具体实现。

  1. 日志清理的管理器与清理线程
    在分布式系统巾,涉及多线程时,-般会用线程池来实现。下面列举了Kafka用到的一些线程池。
  • 服务端网络层处理器的线程数(nul’l.『1etwork.threads),默认值为3。网络服务端(SocketServer)会创建多个处理器(processor)。
  • 服务端处理网络请求的钱程数(川『11.io.threads),默认值为8。Kafka的请求处理线程池(KafkaRequestHandlerPool)会创建多个请求处理器(KafkaRequestHandler)。
  • 日志清理的线程数(log.cleaner.threads),默认值为1。日志清理器(LogCleaner)会创建多个清理线程(CleanerThread)。

如图6-39所示,日志管理器(LogManager)除了管理日志的常用操作,也管理了一个日志清理器(LogCleaner)。日志清理器通过管理器(LogCleanerManager)选择:+1需要清理的日志(LogToClean),并将具体的清理动作交给清理线程(CleanerThread)完成。

《6.1.6 日志压缩》

日志管理器需要把数据目录(logDirs)和所有的日志(logs)作为参数,传递给日志清理管理器(LogCleanerManager)。每个数据目录都有一个清理点检查点文件(cleaner-offset-checkpoint),它记录了每个日志的最近一次清理点位置。日志清理管理器的9日bFilthiestlog()方法在选择日志时,会读取每个日志的清理点,然后选择最需要清理的日志。并且在这个日志清理完成后,它也会负责在doneCleaning()方法中,将这个日志的最新清理点写入清理点检查点文件。相关代码如下:

《6.1.6 日志压缩》
《6.1.6 日志压缩》

清理线程(CleanerThread)每次运行时,只会让管理器选择一个最需要清理的日志;清理线程对应的清理器(Cleaner)每次也只会清理一个日志。清理日志的选择有一定的策略:选择cleanableRati.o比率最大的那个日志。相关代码如下:

《6.1.6 日志压缩》

每个分区的日志都对应一个LogToClean对象,选择哪个分区优先做合并操作的计算公式是:日志头部的大小(di.rtyBytes)除以日志的大小(日志头部加上日志尾部),选择比率最大的那个分区。相关代码如下:

《6.1.6 日志压缩》

清理点作为日志头部和尾部的分界点,清理点之前的尾部,消息的偏移量是断续的;而清理点之后的头部,每条消息的移量都是递增的。如图6-40所示,日志压缩的具体步骤如下。

(1)消息追加到活动的日志分段,选择活动日志分段之前的所有日志分段参与日志压缩。
(2)为日志头部构建一张消息键到偏移茧的映射表,相同键但偏移盘低于映射表的消息会被删除。
(3)通过复制消息的方式,将:需要保存的消息复制到新的日志分段,每条键都只有一条最新的消息。
(4)复制完成后,新的日志分段会代替所有参与压缩操作的旧日志分段。
(5)更新日志的清理点,为下次的日志压缩做准备。清理点会将日志分成头部和尾部。

《6.1.6 日志压缩》

在图6-40中,步骤(1)到步骤(3)方框之间的虚线间隔表示清理点分隔的日志头部(偏移iiJ从7到12)和尾部(偏移量最高到6),它们可能分别都有多个日志分段。步骤(4)中的灰色方框产生了两个新的日志分段,清理点也更新到了日志头部的下一个偏移量位置130接下来分析’罔6-40巾步骤(2)到步骤(4)的具体实现,即清理线程执行日志压缩操作。

  1. 日志清理
    日志清理的工作主要分成3步:为日志头部构建映射表、对所有日志分段进行分组、分别清理每一组的日志分段。映射表结构是消息键到偏移量的映射关系(OffsetMap),这就不可避免地需要读取日志头部的所有日志分段。为构建映射表而需要读取日志头部的消息,和最后复制旧日志分段的消息,都使用同一个读缓冲区(readBuffer)。从旧日志分段读取出来的消息会先暂存到读缓冲区,然后复制到写缓冲区(wr1.teBuffer),最后才会将写缓冲区的消息写到新日志分段中。

日志以清理点(firstDirtyOffset)作为头部和尾部的边界,目的是为清理点之后的日志头部构建一张映射表。复制消息时需要复制头部和|尾部两部分的消息,即从偏移盘。一直到“当前活动日志分段基准偏移盐”的所有日志分段都参与日志压缩操作。相关代码如下:
《6.1.6 日志压缩》

日志头部和口志尾部的所有日志分段都会参与日志压缩,压缩会将一个大文件变成一个小文件。为了防止州现太多的小文件,日志压缩并不是对每个小文件单独压缩,而是将多个相连的小文件组成一组一起用缩。同一个组的所有小文件加起来不能超过日志分段的阔值,杏则压缩后新生成的日志分段就会超过阔值。分组时,日志尾部的日志分段都是上次日志压缩后产生的小文件,它们需要进行分组。而日志头部之后的每个日志分段,它们的文件大小都等于分段阔值,每个日志分段都单独一组。groupSegfTlentsBySize()方法会为所有参与压缩的日志分段进行分组,这涉及Scala列表的一些复杂操作。比如,获取列表第一个元素(head)、获取剩余元素(tan)、追加元素(::)。相关代码如下:
《6.1.6 日志压缩》
《6.1.6 日志压缩》

当前活动日志分段之前的所有日志分段在加入分组时,顺序不能打乱。因为后面将消息复制到新的日志分段时,是按照顺序复制的。如果日志分段顺序乱了,消息的顺序就不一致了。分组的方法比较复杂,一共有两层循环,内层循环用来确定同一组的所有日志分段,外层循环用来确保分配完所有的日志分段。如图6-41所示,假设有11个日志分段参与分组,图中折线是将[1,2,3,4]这4个分段加入第一个分组的过程,其他分段加入到对应分组的过程与之类似。

《6.1.6 日志压缩》

每一个分组有一个或多个日志分段,每一组都会调用cleanSegl’lents()方法清理这一组内的多个日志分段。同一个组的多个日志分段清理后,只会生成一个新的日志分段,而非多个。新日志分段的文件名会和本组内第一个日志分段的文件名一样。比如,一组内有3个日志分段[OOOO.log,0100.log,0200.log],清理后的日志文件名还是0000.log。当然,因为清理完成之前,原先旧的日志分段还存在,所以要先创建对应的临时文件:[OOOO.log.cleaned,0000.index.cleaned]。相关代码如下:

《6.1.6 日志压缩》
《6.1.6 日志压缩》

清理每个分段时,只有需要保留的消息才会复制到新日志分段中。下列两种场景会把消息删除掉。

  • 如果消息的键在映射表中,但是消息的偏移量比映射表中的偏移量低,删除这条消息。
  • 消息是一个墓碑标记,并且墓碑标记已经过期,不再需要保留,也要删除这条消息。

日志清理过程中与保留消息相关的代码如下:
《6.1.6 日志压缩》

Kafka的日志清理有两种策略:删除策略会直接删除日志分段,确保日志占用的磁盘空间不会一直膨胀;压缩策略则不同,它只会删除相同键,但是偏移量较旧的消息。日志压缩的特点是:消息的键只要有值,它就永远不会被删除。除非生产者发送了值为空的消息,才会将消息删除掉。

总结下本节的主要内容:我们将消息集的格式作为人口,分析了日志、日志分段,以及它们的读写方法。我们主要关注的是物理层(或者叫存储层、文件系统)的实现,下面将分析日志的上层逻辑:即服务端如何处理客户端发送的读写请求,并最终调用到日志的读取方法。

在0.7版本之前,Kafka还没有副本机制,每个消息代理节点的Kafka服务(KafkaServer)都直接管理了一个日志管理器(LogManager)。服务端处理生产请求和拉取请求也都比较简单直接:通过日志管理器获取出分区对应的日志,就可以直接操作日志对象了。相关代码如下:
《6.1.6 日志压缩》

在0.8版本之后,Kafka引入了副本机制。Kafka服务在处理请求时,转而通过基于日志管理器之上、并且与副本机制相关的副本管理器完成操作。

    原文作者:water___Wang
    原文地址: https://blog.csdn.net/WANTAWAY314/article/details/115981850
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞