浅析Storm性能调优

性能调优是一个永无止境的追求,没有最好,只有最适合,是机器资源和业务需求的一种平衡。Storm性能优化包括硬件层面的优化、代码层面的优化、Topology并行度的优化、Storm集群配置参数和Topology运行参数的优化。如何寻找程序运行的瓶颈?可以先查看整个Topology的耗时,寻找耗时的业务部分,针对每个组成部分进行优化,再对硬件集群参数进行优化。在讲性能调优之前需要先讲Storm基础知识,因为这是进行调优的必备知识。

1 Storm基础

Storm是一个分布式实时流式计算平台,storm集群主要由一个nimbus和多个supervisor组成,它们都把状态存储在zookeeper上,整体架构如下图所示:

《浅析Storm性能调优》
《浅析Storm性能调优》

1.1 各组件的功能

nimbus: 主节点,负责将Topology的worker,根据所在集群的资源情况调度到合适的supervisor上。

supervisor: 在工作节点上,负责启动和运行Topology的进程(worker)。

worker:表示一个进程。所有的数据处理和传输都在worker中完成,nimbus和supervisor均不参与。

executor:一个worker里面可以执行一个或多个线程。

task:一个executor执行一个或多个(一般是一个)。task分为Spout和Bolt两种不同类型。

zookeeper: 负责存储nimbus和supervisor的状态。

1.2 Topology

Storm上运行的业务叫Topology,Topology提交后开始运行,不会自动结束,直到被kill为止,Topology是一个DAG(有向无环图),可以有很多阶段,如下图所示。

《浅析Storm性能调优》
《浅析Storm性能调优》

Tuple: Tuple是Storm里面处理的数据单元,它由多个字段组成(自定义个数和类型)。

Stream: Stream是一个源源不断的Tuple流。

Spout: Spout的角色是从数据源(如Kafka等)获取数据,发送Tuple到进行后续处理的Bolt。Spout代码往往是公共的,不需要业务去实现。

Bolt: Bolt的角色是处理数据,输入是上游(Spout或Bolt)的Tuple,输出是发往下游(Bolt)的Tuple;Bolt可以有多级,一般最后一级Bolt会定期把结果写到外部存储。Bolt代码是业务实现逻辑的地方,只用实现相关的回调函数完成业务逻辑即可,不需要关心数据传输、容错等问题。

Topology: Topology是对一个应用的Spout、Bolt类型、输入输出Tuple/Stream、关联关系的描述。

2 性能优化

硬件层面的优化,就是常规的增加机器的硬件资源,提高机器的硬件配置,因为机器配置太低,或者运行太多的业务,使得硬件的CPU、宽带、内存成为了瓶颈,对于开发者来说如何调优都不起作用。

对于开发者来说,主要是对代码层面、Topology并行度、以及Storm的参数调整进行优化,接下来主要讲这几个方面。

2.1 代码层面的优化

代码层面的优化需要考虑在算法层面、业务层面、存储层、以及Storm本身特性上的优化。

算法层面,需要衡量代码执行效率,需要考虑算法的时间复杂度。选择随着数据量增长,算法的时间复杂度不能呈现指数级增长。

业务层面,需要根据资源消耗类型与性能差异,将业务拆分成多个Bolt,然后根据每个Bolt的特性分配不同的并行度。比如CPU密集型的计数功能和I/O密集型的网络请求功能拆分到不同Bolt里,让各自的Bolt只做自己擅长的事,利于后续优化。

存储层方面,需要考虑连接数据库的时候是否用了连接池来减少程序连接数据库的资源开销,是否合理使用批处理以减少程序和数据库的通信,下节会通过一个业务实例来说明。

在Storm特性优化方面,优先使用localOrShuffleGrouping代替shuffleGrouping。localOrShuffleGrouping是指如果task发送消息给目标task时,发现同一个worker中有目标task,则优先发送到这个task;如果没有,则进行Shuffle,随机选取一个目标task。 这样能够减少网络开销和序列化操作;在使用fieldsGrouping根据某个字段进行分组时,可能会出现数据不均衡的情况,如果数据按照某一个key进行分组,则具有相同的key都会被发送到同一个task上。如果这些key的量特别大,会导致这个task负载特别高,出现数据不均衡的情况。因此要合理设置分组的key和合理使用分组策略。

2.2 并行度的优化

Storm的Topology中并行度的优化就是合理设置Worker、Spout、Bolt的并行度。

2.2.1 Worker并行度的设置

Worker可以分配到不同的 supervisor 节点,这也是 Storm 实现多节点并行计算的主要配置手段。因此, Workers 的数量,可以说是越多越好,当然也要看硬件资源是否足够,是否浪费资源。

默认情况下,一个Worker 分配 768M 的内存,外加 64M 给 logwriter 进程;因此一个Worker 会耗费 832M 内存。

Worker的并行度需要考虑节点数量,及其内存容量,数据量的大小和代码执行时间。如果数据量大,在数据传入后Spout的运行时间明显增加,说明数据有堆积情况产生,应该增大Worker的数量。如果代码执行时间长,则需要通过增加Worker数量来将压力分散到更多的节点上以提升并发能力。

2.2.2 Spout并行度设置

如果 Storm读取的是Kafka 的数据,Spout的并行度最好和Kafka的Partition数量一样(和kafka消费策略有关,如果以前没了解过,则需要先看看Kafka相关的知识)。如果Spout的并行度小于Partition的数量,则存在一个读取数据的线程对应着多个Partition(即一个线程会去消费多个Partition),可能存在数据来不及消费的情况。如果Spout的并行度多于Partition的数量,则会产生多余线程,浪费计算资源。

2.2.3 Bolt并行度设置

Bolt的并行度设置看每个Bolt的Complete Latency(ms)的时间,哪个耗时比较长可以增大平行度,具体见下文业务实例的分钟。

2.3 配置参数优化

topology.max.spout.pending: 最大 Spout 挂起时间。一般Spout 的发射速度会快于下游的 Bolt 的消费速度,当下游的 Bolt 还有 pending中的 Tuple 没有消费完时,Spout 会停下来等待,该配置作用于 Spout 的每个 task。因此这个参数需要合理设置。

topology.message.timeout.secs:元组超时时间设置。确定在完成发送消息之后,在设置的超时时间内如果未收到确认,则将消息处理视为失败。此参数的设置需要考虑是需要数据的及时性还是需要数据的失败率小一些。如果及时性高,元组超时时间可以设置的短些,可能数据的失败率就高些。如果需要降低失败率可以适当延长元组的超时时间。

worker.heap.memory.mb、topology.worker.max.heap.size.mb用来调整分配给每个 Worker的内存。当运行程序的Worker报出内存溢出的情况下,比较管用。

3 实例

在本人实际工作中遇到了一个关于预测模型的计算,业务逻辑模型为从Kafka接入实时数据,然后进行数据解密、预测模型逻辑处理,最终写入Redis库

我们做了如下优化流程:

1. 优化Spout并行度

刚开始接入的Kafka的Partition的数目为10个,在数据高峰时段数据的堆积

明显上升,因此将Kafka的Partition设置为20个,增加并行度。并将Spout的并行度设置为20。Topology中的Bolt分为2个,当时的运行结果如下图。

《浅析Storm性能调优》
《浅析Storm性能调优》

2. 拆分业务

parseBolt其中做了数据解密以及预测模型逻辑的视线,到底是数据解密耗时间还是预测逻辑耗时间无法确定,因此拆分业务逻辑,在Topology中变为3个Bolt如下图所示。

builder.setSpout("kafkaSpout", kafkaSpout, numSpout);
builder.setBolt("parseBolt",new StreamingBolt(new Fields("data","version"),"virpy2.7/virpy2.7/bin/python", "parse.py"),Integer.parseInt(args[4])).shuffleGrouping("kafkaSpout");
 builder.setBolt("infoBolt",new InfoBolt(),Integer.parseInt(args[5])).shuffleGrouping("parseBolt");
 builder.setBolt("writeBolt",new WriteBolt(),Integer.parseInt(args[6])).shuffleGrouping("infoBolt");

在不断调试的过程中发现是数据解密的时候耗时比较长,因此增加了这个Bolt的并行度后,性能提升明显。

3. 优化存储过程

Topology最后一级Bolt是要把分析结果写入Redis, 由于Tuple吞吐量为每分钟数万级,Redis库中还有其他业务,因此给Redis服务器带来很大压力。

以前的代码是每处理一个Tuple就访问一次, 显然对于海量数据的实时处理是不合理,在创建连接、关闭连接一次只是处理一条数据非常的消耗性能。因此,采用连接池的方式减少连接、建立、销毁的性能消耗,同时使用批量更新模式,在本地内存中实现计数功能,每隔100个向Redis中批量更新,减少Redis服务器的压力。采用pipeline进一步改造了Redis代码部分,分批次发送Redis访问请求。这样可以极大的减少访问Redis的网络往返次数,使得整个Topology的延时降低。

对于业务来讲由于Batch的个数的设置,影响数据写入库的时间,因为每个Batch的数据要累积到100个才写入库。再次优化为每10个写入库,使得入库的时间更加实时。参数的设定是性能和业务需要的权衡。最终效果如下图。

《浅析Storm性能调优》
《浅析Storm性能调优》

经过以上的优化使得该工程比之前的快了4倍。

更多精彩内容请关注360智控

    原文作者:奔奔
    原文地址: https://zhuanlan.zhihu.com/p/46816103
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞