基于Spark、NoSQL的实时数据处理实践(下)

作者:TalkingData 张学敏

本文由TalkingData原创,转载请获取授权。

本文基于TalkingData 张学敏 在公司内部KOL的分享主题《基于Spark、NoSQL实时数据处理实践》的整理,同时也在DTCC大会上做了同主题的分享。

主要介绍了项目的技术选型、技术架构,重点介绍下项目面临的挑战和解决办法,还介绍了面对多维度、多值、多版本等业务场景时,使用Bitmap与HBase特性解决问题方法。

共分为上下两篇,本次发布下篇,上篇请点击查看

接上篇

四、挑战和方案

《基于Spark、NoSQL的实时数据处理实践(下)》
《基于Spark、NoSQL的实时数据处理实践(下)》

整体服务的稳定性取决于各个组件,大的原则就是故障告警,然后人工介入进行处理。其中Kafka相对还是比较稳定的,如果出了问题,我们可以切换到业务线的集群。无论是离线还是实时计算,资源的分配和管控都是个大难题。我们在17年将Yarn跑在了Docker上,这样可以按团队或者业务优先级分配集群资源,如果Yarn出了不可修复的故障,我们可以基于容器快速重建集群。如前面说到,ScyllaDB相对问题还是比较多的,所以我们所有使用到ScyllaDB地方都有自动降级到HBase的能力。

HBase目前是一个集群,没做集群级别的热备,但我们将所有表的建表语句及HFile文件都做了备份,如果HBase出了不可恢复的故障,或者恢复时间会比较长的话,我们可以通过建表语句以及HFile文件快速的重建集群。

《基于Spark、NoSQL的实时数据处理实践(下)》
《基于Spark、NoSQL的实时数据处理实践(下)》

容量预估也是件令人头疼的事,公司是按月采购服务器,申请量大了会受到大佬们的挑战也不会给批,申请量小了,服务就会受到影响。除了物理机资源受限外,像Kafka Tpoic的分区,HBase、ScyllaDB表的分区以及Spark CPU、内存等也需要考虑容量预估,因为数据流量是不确定的,如果前期设置的值都很大,而流量很小,就会造成物理资源的浪费,如果前期设置的值比较小,就有可能需要在流量变高时做调大的变更。所以,总体来讲, 我们的整体原则就是根据以往数据,凭借历史经验进行预估、设置。

《基于Spark、NoSQL的实时数据处理实践(下)》
《基于Spark、NoSQL的实时数据处理实践(下)》

相信数据正确性、一致性是流处理中最棘手的问题之一了,当然难易程度也跟业务需求有关系,有些业务可以容忍数据丢失或者数据重复,这样的就相对好处理些,比较难的是数据要求不丢其不重复,即精确一次exactly-once,更难的是有些业务还要使用Window等函数进行复杂运算,此时要做到精确一次需要的成本可能会非常大,所以我觉得是没有银弹,需要根据实际业务及成本情况做取舍、定方案。

我们的业务场景相对比较简单,都只是对数据进行解析、转换、补充处理,然后入库。我们可以使用HBase version的特性去重,保证数据没有重复。怎么做到的呢?我们会对每条事件数据抽象出各种实体,每个实体都有唯一ID,然后我们用数据中的事件时间作为HBase version的值,这样,即使是同一条数据被重复处理,结果写入HBase多次,但由于都是同一ID下同样数据的同一version,所以对于HBase来讲还是同一条数据。

这样的话,数据计算阶段只需要保证至少处理数据一次即可。而无论是使用Spark的 receiver还是Direct消费模式都不难做到至少一次处理数据,receiver模式相对成本会高些,另外他的offset是通过Zookeeper管理,可能会因为两者之间的交互不能及时响应等情况而造成数据不一致。还有就是,他的内存问题会比较严重,因为他接收数据和处理数据是分开的,如果处理速度跟不上,就很容易出现数据淤积导致内存问题,所以建议使用direct模式,该模式是直接把kafka的partition映射到RDD里的partition,只有到了action才会去取数据,同时处理数据。

那么这种模式怎么保证数据至少被消费一次呢?checkpoint机制。checkpoint还是非常高效的,没有涉及到实际数据的存储,一般只有几十k大小,主要存储的是Kafka的Offset等信息。前面说到Spark Streaming是微批处理数据,它每个批次的大小是按时间来定的,比如1s、10s或者其他值,每个批次对应Spark Streaming的一个Job,在Job开始、结束异常等情况下都会触发checkpoint事件,如果有问题会根据checkpoint数据再重新执行Job。另外,我们也会周期性的探测并记录kafka offset位置与时间的关系,以及Streaming处理的offset位置和时间的关系,一方面是为了监控数据有没淤积,另一方面,即使checkpoint不能起作用的时候,也可以通过offset与时间的关系,推算出问题时间点offset的位置。

另外,官方也有关于exactly-once的方案,大的思路就是让批次操作幂等。具体做法,比如可以针对每个批次的partation数据产生一个UniqueID,只有当整个UniqueID相关数据都被完全计算才算成功,否则进行回滚。如果执行到重复的UniqueID,直接跳过该批次。

《基于Spark、NoSQL的实时数据处理实践(下)》
《基于Spark、NoSQL的实时数据处理实践(下)》

在我们的项目中,影响Spark Streaming性能稳定性主要有三个方面。前面提到,我们Yarn是运行在Docker上的,另外,我们的计算(Yarn on Docker)和存储(HDFS的DataNode)是公用物理机资源的,因为Docker宿主盘如果挂载多盘,即与DataNode公用盘的话,管理成本会比较高,所以前期我们是挂载到系统盘的,大概600G左右。而Spark shuffle、作业日志等也会写盘,一些情况下会将盘写满,导致docker实例或者物理机故障,从而影响作业性能。我们现在的方案是,为Docker宿主盘采购了1T的SSD盘,一个是可以缓解该问题,一个是提高shffle读写性能。

第二个是慢节点问题,引发原因可能是数据不均衡,也有可能是该节点负责较高或者其他原因。可用的解决方案有,开启推测执行就,需要注意的是如果是非幂等操作,不可以使用,会引起不一致问题。如果是数据不均衡引起,可以将分区个数调大,一定程度上可以缓解。如果数据不均衡是数据key及分区分布算法引起,需要根据数据情况调整或者重写分区算法。针对慢节点问题,还也可以尝试使用spark的reblance算子,但这样会导致每批次整体时间都变长。

第三个问题是数据量突增,比如应用故障恢复后需要追赶数据,或者突然有了新的流量高峰等一些原因会导致Spark处理淤积,甚至故障,解决方案是使用Spark的控量及背压功能。Spark 可以通过参数配置每个批次每个partition消费的数据量,Direct模式使用saprk.streaming.kafka.maxRatePerPartition,Rceiver模式使用spark.streaming.receiver.maxRat。但这是个静态值,而数据流量是动态的,有时低有时高。如果这个值我们设置的高了,依然可能会引起计算延迟(数据不能在批次间隔时间内处理完),如果设置低了,会导致计算资源浪费,并且数据可能会在Kafka侧淤积。

背压机制相对会灵活些,可使用spark.streaming.backpressure.enable配置控制是否开启,他会根据上一批次数据处理的情况确定下一批次数据的摄入量。但会有个特殊情况,作业启动后的第一个批次没有参考数据,Spark提供了另一个参数spark.streaming.backpressure.initialRate来控制背压第一次处理的数据量,但需要注意的是,Spark低版本时候这个参数只在receiver模式起作用。Direct模式就需要结合控量功能使用,比如可以设置控量值是“最优量”(计算批次内数据所用时间尽可能的接近但又一直低于批次调度间隔时间)的1到2倍,这样第一批次摄入的数据量是saprk.streaming.kafka.maxRatePerPartition的值,紧接着几批会因为第一批次处理延迟而依旧采用控量值为摄入量,直到第一个批次运算完,后续批次才会触发背压机制自动优化摄入量,但最大上限还是saprk.streaming.kafka.maxRatePerPartition的值。

虽然,我们期望流式处理是7*24模式运行的,但有时候也有需要去停止应用,比如,期间某个组件故障,或者是需要升级,或者需要变更数据处理逻辑。如果我们强制杀掉应用程序,很大概率会引起上边所说的不一致的问题,所以需要考虑优雅停止的方案。所谓优雅停止,也就是能够完整的执行完一个批次job,然后停止。

有两种方式,一种是向应用Driver发送Sigterm信号,一种是通过事件触发。第一种需要先设置stopGracefull 为true,然后找到drive所在的节点,登陆到节点上,执行kill命令,注意,一般使用流式处理时候都会设置应用重试提交次数,如果是多次,就需要要多次进行如上操作。并且后续每次停止都需要如此操作,所以相对比较麻烦。

第二种相对简单,大的思路是定义触发事件,这个可行的方式比较多,比如使用标识文件,间隔去检测文件是否存在,还可以使用socket监听,或者使用restfull服务都可以,然后在需要的时候触发事件,注意,事件触发后一定要在独立线程调用ssc.stop,否则会产生死锁情况。Stop其中第一个true的含义是停止关联的SparkContext,第二个true,就是确定要执行gracefull stop。

下边是其他一些建议设置或者需要留意的参数,第一个参数是消费Kafka数据连接重试次数,在负责较高或者一些其他情况下,Kafka parttion会出现Lead不存在的异常,但大多时候有会很快恢复,默认是1,建议调高。第二、三的参数需要配合使用,第二个就是前面刚说的引用重试提交次数,默认是1、根据实际情况建议调高,但yarn有个限制,需要对应考虑调高。如果作业长时间运行,无论配置多大都会在某一时间点达到,所以第三个参数是Spark提供的让重试计数重置为0的时间间隔。第四个参数是应用程序可以容忍executor失败的最大数量,默认值是executor数量的2倍,也建议调高,与应用失败次数类似,最后一个是控制executor计数重置的时间间隔。当然,优化相关参数还有很多。

《基于Spark、NoSQL的实时数据处理实践(下)》
《基于Spark、NoSQL的实时数据处理实践(下)》

前面说HBase劣势时候提到他的维护成本较高,其中主要的就是这三个方面,一个是major compact,前面提到HBase是使用LSM模型,写数据是先写内存,然后刷写到磁盘,另一个就是HDFS不支持随机写入,所以HBase对数据的修改和删除都是对数据做标志,然后写入库,并不是真正的修改活删除原数据。这样的数据一直存在的话,一个是会影响查询性能,一个是存在无用存储,HBase的Major compact就是为了清理这些数据,当然也还会清理其他数据,比如如果设置了TTL的话,到期的数据也是在Major compact时候清理,还有就是历史版本数据,比如我们设置保留一个版本,那除了最新版本外的数据都会在执行Major compact的时候被清理。清理的时候就需要加载所有数据,所以会产生很大的磁盘、网络IO,一定程度上就会影响HBase的读写,甚至会引发客户端阻塞。

我们的应对方案是,关闭compact自动执行,并调高触发上限,就是不让HBase自己控制Major compact的执行。我们还做了个工具,可以指定表、每个列族最小执行Major compact的文件个数,每个rs每次处理region个数等维度在集群相对空闲时间执行Major compact,并根据期望执行的时间调整以上维度参数控制整体执行时间。

与compact相关的还有个叫Minor compact,前面说到,内存满了后会刷写磁盘,那么每次刷写磁盘都会产生新的文件,时间久了就可能会有很多小文件,HDFS的元数据就会变的很多,HDFS元数据在内存中管理,即使不会到NameNode内存大小瓶颈,但也会影响性能,所以Minor就是将这些小文件进行合并。

第二个可能会影响性能的就是region split,特别是存在数据热点时候,如果配置的region最大存储值又比较小,可能会产生递归分裂情况,就是刚分裂完的region还没完成正式上线,发现又需要做分裂,这样region就会长时间无法对外提供服务,影响HBase的使用,我们的应对方案是使用预分区建表,并根据实际数据情况设置split keys以及region存储最大值。

第三个就是MemStore flush,HBase的很多调优大多都是围绕内存,特别是在大量写的场景下,flush会严重影响HBase的性能,甚至引发故障,这个也没有银弹,需要大家根据实际数写入情况和物理机内存大小进行调优,调优参考的东西就是各种监控指标,比如flush queue大小、频次,磁盘、网络IO等,所以HBase的mectric监控尤其重要。

《基于Spark、NoSQL的实时数据处理实践(下)》
《基于Spark、NoSQL的实时数据处理实践(下)》

HBase配置相关的建议也都是跟compact 、flush相关的,前两个分别是控制Major compact和Flush线程个数的,需要根据物理CPU和实际使用情况调整。第三个是控制HBase自动执行全局flush的周期时长的参数,默认是1个小时,建议设置为0关闭。第四个是控制HBase触发执行Flush的条件,就是单个region的下所有MemStore的大小的阈值。如果达到该值,就会准备对该region执行flush。第五个参数是当region的memstore达到多少倍时阻塞客户端的写入操作,目的是防止引发RegionServer OOM。

因为一个region下有可能有多个MemStore,即表有很多列族,而当执行flush时,有可能有些MemStore数据量占比很小,比如k级别,刷写的话也不会释放很多内存,却会产生很多小文件,这也是官方建议大家设计表时列族不要太多的原因,也是前边建议第二个参数设置为0的原因。而第六个参数,就可以控制region执行flush时MemStore的最小值,也就是小于这个值的MemStore不执行flush,具体值需要参考刷写的HFile的大小及实际列族个数和写入量调整。

HBase低版本时候读写缓存都使用堆内内存,所以需要根据业务场景配置读写缓存的占比,如果是读多写少场景,可以加大读缓存占比,反之,调高写缓存占比。但两者加和不能超过整个堆的80%。

第9个参数也是为了防止OOM的,括号里的是老版本的叫法,需要注意的是,在新老版本HBase中有不同的含义,假设配置的值是0.3,在老的版本中含义是,当RegionServer所有MemStore占用内存超过Heap的30%时会选择一些占用内存比较大的MemStore阻塞写并进行Flush;在新的版本中的含义是,当占用内存超过hbase.regionserver.global.memstore.size的30%时会选择一些占用内存比较大的MemStore阻塞写并进行Flush。这个差别还是比较大的,我们开始是按照老版本含义去设置,发现flush特别频繁,并且Hfile特别小。然后查官方文档才发现这个不同,这个大家一定要注意下,特别是使用过老版本HBase的同学。

《基于Spark、NoSQL的实时数据处理实践(下)》
《基于Spark、NoSQL的实时数据处理实践(下)》

前面提到,我们有面向实体,多维度、多值、多版本存储、查询数据的需求,那我们在使用HBase存储上是怎么解决的呢?前面说HBase也有表的概念,所以,我们将不同实体的数据存放在不同的表中。还依上边说过的数据举例,数据中有imei和wifi两个维度,我们用HBase 的Column Family来区分。其中wifi维度有wifi1、wifi2两个值,我们将两个值做为HBase的列名,其中共涉及到5个时间戳,将他们做为HBase的version,为了节省空间和后边方便使用bitmap,我们将时间戳转换成了相对时间,以2016年一月一号,0时0分为起点,分钟为间隔。其中HBase的列值我们未进行赋值。

《基于Spark、NoSQL的实时数据处理实践(下)》
《基于Spark、NoSQL的实时数据处理实践(下)》

前面提到,业务有在低成本提取某个实体历史所有信息的需求,如果查询时间窗口很大或者数据很密集等原因很容易会造成服务OOM。由于我们数据中的设备硬件信息、ID信息几乎是不变的,另外行为轨迹也比较固定,所以位置信息、wifi信息重合度也会比较高,所以,我们使用bitmap对数据进行了聚集。如图所以,我们通过离线计算将HBase中的数据进行转换,假如wifi1是我在公司连接的wifi,那么一天时间我都会使用同一个wifi,但可能数据会上报很多条,这样的话,以wifi1作为bitmap的标识key,wifi被捕获的相对时间1,2,3作为bitmap的位值,就会大大节省存储空间。算完后,再将数据转载进HBase共服务使用,表的schema和前边大体一样,只是version不在特殊赋值,并将bitmap转换为字节数组存为列值。

《基于Spark、NoSQL的实时数据处理实践(下)》
《基于Spark、NoSQL的实时数据处理实践(下)》

前面提到,由于docker目前宿主盘使用一个,所以我们期望后期可以调整为多块盘。另外,由于不同项目对资源的需求不同,比如图计算需要较大内存的executor,所以后期会在生产存在多版本的image,目前平台没法进行多版本的管理。

虽然Spark Streaming UI上有一些监控信息,但有时候还是需要查看日志,或者查看JVM相关的监控信息,目前我们正在进行GraphiteSink监控指标梳理及数据接入工作。最后一个是关于HBase的,高版本的HBase 有region级别的replica,由于我们之前使用HBase场景都是离线计算装载数据,所以没法使用该功能,这个项目是实时写入,该功能可以提高HBase的可用性,所以后续我们会进行该功能的测试、上线的工作。

    原文作者:TalkingData
    原文地址: https://zhuanlan.zhihu.com/p/38329964
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞