Hive的性能优化以及数据倾斜

hive性能优化

一、Map阶段的优化:

(控制hive任务中的map数,确定合适的map数,以及每个map处理合适的数据量)

map个数影响因子:

  1. input目录中文件总个数;
  2. input目录中每个文件大小;
  3. 集群设置的文件块大小(默认为128M, 可在hive中通过set dfs.block.size;命令查看,不能在hive中自定义修改);
举例:
input目录中有1个文件(300M),会产生3个块(2个128M,1个44M)即3个Map数。
input目录中有3个文件(5M,10M,200M),会产生4个块(5M,10M,128M,72M)即4个Map数。
适当减少Map数:

当一个任务有很多小文件(远远小于块大小128m),会产生很多Map,而一个Map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费,而且同时可执行的map数是受限的。

set mapred.max.split.size=100000000;//(100M)
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;//表示执行前进行小文件合并。
//大于128:按照128M分割;100~128按照100分;小于100的进行合并。
适当增加Map数:

当有一个小于128M的文件(其中有上千万的数据,字段少并且数据单位小),如果map处理的逻辑比较复杂,用一个map任务去做,耗时比较大。

set mapred.reduce.tasks=10;
create table a_1 as 
select * from a distribute by rand();
//表示通过设置Map任务数来中加Map,把a表中的数据均匀的放到a_1目录下10个文件中。
Map端聚合:
set  hive.map.aggr=true ;(默认为true)

二、Reduce阶段的优化:

2.1 指定reduce数量
 set mapred.reduce.tasks=10
2.2未指定reduce数量
param1:hive.exec.reducers.bytes.per.reducer(默认为1000^3)
param2:hive.exec.reducers.max(默认为999)
reduceNum = min(param2,总输入数据量/param1(reduceNum = InputFileSize / bytes per reducer))

通常情况下,有必要手动指定reducer个数。考虑到map阶段的输出数据量通常会比输入有大幅减少,因此即使不设定reducer个数,重设参数2还是必要的。依据Hadoop的经验,可以将参数2设定为0.95*(集群中TaskTracker个数)。

三、其他优化:

Multi-insert & multi-group by

从一份基础表中按照不同的维度,一次组合出不同的数据

FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1)] select_statement1 group by key1
INSERT OVERWRITE TABLE tablename2 [PARTITION(partcol2=val2 )] select_statement2 group by key2
#具体实例
FROM pv_users 
INSERT OVERWRITE TABLE pv_gender_sum
SELECT pv_users.gender, count(DISTINCT pv_users.userid) 
GROUP BY pv_users.gender 
INSERT OVERWRITE DIRECTORY '/opt/data/users/pv_age_sum'
SELECT pv_users.age, count(DISTINCT pv_users.userid) 
GROUP BY pv_users.age; 
生成MR Job 个数
生成一个MR Job

多表连接,如果多个表中每个表都使用同一个列进行连接(出现在JOIN子句中),则只会生成一个MR Job。

SELECT a.val, b.val, c.val FROM a 
JOIN b ON (a.key = b.key1) 
JOIN c ON (c.key = b.key1)

三个表a、b、c都分别使用了同一个字段进行连接,亦即同一个字段同时出现在两个JOIN子句中,从而只生成一个MR Job。

生成多个MR Job

多表连接,如果多表中,其中存在一个表使用了至少2个字段进行连接(同一个表的至少2个列出现在JOIN子句中),则会至少生成2个MR Job。

SELECT a.val, b.val, c.val FROM a 
JOIN b ON (a.key = b.key1) 
JOIN c ON (c.key = b.key2)

三个表基于2个字段进行连接,这两个字段b.key1和b.key2同时出现在b表中。连接的过程是这样的:首先a和b表基于a.key和b.key1进行连接,对应着第一个MR Job;表a和b连接的结果,再和c进行连接,对应着第二个MR Job。

数据倾斜:

倾斜原因:

map输出数据按key Hash的分配到reduce中,由于key分布不均匀业务数据本身的特性建表时考虑不周某些SQL语句本身就有数据倾斜等原因造成的reduce上的数据量差异过大,所以如何将数据均匀的分配到各个reduce中,就是解决数据倾斜的根本所在。

解决方案:

1. 空值数据倾斜

join的key值发生倾斜,key值包含很多空值或是异常值,这种情况可以对异常值赋一个随机值来分散key。
案例:在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。

select * from log l
left outer join user u on 
case when (l.user_id is null or I.user_id='-' or I.user_id='0') 
then concat(‘sql_hive’,rand() ) else l.user_id end = u.user_id;
2. Join操作产生数据倾斜
2.1 大表和小表Join

产生原因:Hive在进行join时,按照join的key进行分发,而在join左边的表的数据会首先读入内存,如果左边表的key相对分散,读入内存的数据会比较小,join任务执行会比较快;而如果左边的表key比较集中,而这张表的数据量很大,那么数据倾斜就会比较严重,而如果这张表是小表,则还是应该把这张表放在join左边。
解决方式:使用map join让小的维度表先进内存。在map端完成reduce。
在0.7.0版本之前:需要在sql中使用 /*+ MAPJOIN(smallTable) */ ;

SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a
JOIN b ON a.key = b.key;

在0.7.0版本之后:可以配置hive.auto.convert.join

配置项缺省值配置说明
hive.auto.convert.join(0.7.0-0.10.0)false; (0.11.0-)true注意:hive-default.xml模板中错误地将默认设置为false,在Hive 0.11.0到0.13.1
hive.smalltable.filesize(0.7.0) or hive.mapjoin.smalltable.filesize(0.8.1)25000000默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中

注意:使用默认启动该优化的方式如果出现默名奇妙的BUG(比如MAPJOIN并不起作用),就将以下两个属性置为fase手动使用MAPJOIN标记来启动该优化

hive.auto.convert.join=false(关闭自动MAPJOIN转换操作)
hive.ignore.mapjoin.hint=false(不忽略MAPJOIN标记)

对于以下查询是不支持使用方法二(MAPJOIN标记)来启动该优化的

select /*+MAPJOIN(smallTableTwo)*/ idOne, idTwo, value FROM
  ( select /*+MAPJOIN(smallTableOne)*/ idOne, idTwo, value FROM
    bigTable JOIN smallTableOne on (bigTable.idOne = smallTableOne.idOne)                                                  
  ) firstjoin                                                            
  JOIN                                                                 
  smallTableTwo ON (firstjoin.idTwo = smallTableTwo.idTwo)  

但是,如果使用的是方法一即没有MAPJOIN标记则以上查询语句将会被作为两个MJ执行,进一步的,如果预先知道表大小是能够被加载进内存的,则可以通过以下属性来将两个MJ合并成一个MJ

hive.auto.convert.join.noconditionaltask:Hive在基于输入文件大小的前提下将普通JOIN转换成MapJoin,
并是否将多个MJ合并成一个
hive.auto.convert.join.noconditionaltask.size:
多个MJ合并成一个MJ时,其表的总的大小须小于该值,同时hive.auto.convert.join.noconditionaltask必须为true
2.2 大表和大表Join

产生原因:业务数据本身的特性,导致两个表都是大表。
解决方式:业务削减。
案例:user 表有 500w+ 的记录,把 user 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

select * from log l left outer join user u
 on l.user_id = u.user_id;

解决方法:当天登陆的用户其实很少,先只查询当天登录的用户,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。

select /*+mapjoin(u2)*/* from log l2
left outer join
 (
select  /*+mapjoin(l1)*/u1.*
from ( select distinct user_id from log ) l1
join user u1 on l1.user_id = u1.user_id
) u2
on l2.user_id = u2.user_id;
3. count distinct 聚 合 时 存 在 大 量 特 殊 值

产生原因: 做count distinct时,该字段存在大量值为NULL或空的记录。
解决方式: 做count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
案例
1.只计算count distinct

select cast(count(distinct user_id)+1 as bigint) as user_cnt
from user
where user_id is not null and user_id <> '';

2.计算完count distinct 后面还有 group by。同一个reduce上进行distinct操作时压力很大,先将值为空的记录单独处理,再和其他计算结果进行union。
在Hive中,经常遇到count(distinct)操作,这样会导致最终只有一个reduce,我们可以先group 再在外面包一层count,就可以了。

select day,
count(case when type='session' then 1 else null end) as session_cnt,
count(case when type='user' then 1 else null end) as user_cnt
from (
  select day,type
  from (
    select day,session_id,'session' as type from log
    union all
    select day user_id,'user' as type from log
  )
  group by day,type
)t1 
group by day;
4. group by 产生倾斜的问题
set hive.map.aggr=true

开启map端combiner:在Map端做combine,若map各条数据基本上不一样, 聚合无意义,通过如下参数设置。

hive.groupby.mapaggr.checkinterval = 100000 (默认)
hive.map.aggr.hash.min.reduction=0.5(默认)

解释:预先取100000条数据聚合,如果聚合后的条数小于100000*0.5,则不再聚合。

set hive.groupby.skewindata=true;//决定  group by 操作是否支持倾斜数据。

注意:只能对单个字段聚合。
控制生成两个MR Job,第一个MR Job Map的输出结果随机分配到reduce中减少某些key值条数过多某些key条数过小造成的数据倾斜问题。
在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可能分发到不同的reduce中,从而达到负载均衡的目的;
第二个 MapReduce 任务再根据预处理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。

    原文作者:Mervyn_2014
    原文地址: https://www.jianshu.com/p/daa4e7c86925
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞