hive性能优化
一、Map阶段的优化:
(控制hive任务中的map数,确定合适的map数,以及每个map处理合适的数据量)。
map个数影响因子:
- input目录中文件总个数;
- input目录中每个文件大小;
- 集群设置的文件块大小(默认为128M, 可在hive中通过set dfs.block.size;命令查看,不能在hive中自定义修改);
举例:
input目录中有1个文件(300M),会产生3个块(2个128M,1个44M)即3个Map数。
input目录中有3个文件(5M,10M,200M),会产生4个块(5M,10M,128M,72M)即4个Map数。
适当减少Map数:
当一个任务有很多小文件(远远小于块大小128m),会产生很多Map,而一个Map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费,而且同时可执行的map数是受限的。
set mapred.max.split.size=100000000;//(100M)
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;//表示执行前进行小文件合并。
//大于128:按照128M分割;100~128按照100分;小于100的进行合并。
适当增加Map数:
当有一个小于128M的文件(其中有上千万的数据,字段少并且数据单位小),如果map处理的逻辑比较复杂,用一个map任务去做,耗时比较大。
set mapred.reduce.tasks=10;
create table a_1 as
select * from a distribute by rand();
//表示通过设置Map任务数来中加Map,把a表中的数据均匀的放到a_1目录下10个文件中。
Map端聚合:
set hive.map.aggr=true ;(默认为true)
二、Reduce阶段的优化:
2.1 指定reduce数量
set mapred.reduce.tasks=10
2.2未指定reduce数量
param1:hive.exec.reducers.bytes.per.reducer(默认为1000^3)
param2:hive.exec.reducers.max(默认为999)
reduceNum = min(param2,总输入数据量/param1(reduceNum = InputFileSize / bytes per reducer))
通常情况下,有必要手动指定reducer个数。考虑到map阶段的输出数据量通常会比输入有大幅减少,因此即使不设定reducer个数,重设参数2还是必要的。依据Hadoop的经验,可以将参数2设定为0.95*(集群中TaskTracker个数)。
三、其他优化:
Multi-insert & multi-group by
从一份基础表中按照不同的维度,一次组合出不同的数据
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1)] select_statement1 group by key1
INSERT OVERWRITE TABLE tablename2 [PARTITION(partcol2=val2 )] select_statement2 group by key2
#具体实例
FROM pv_users
INSERT OVERWRITE TABLE pv_gender_sum
SELECT pv_users.gender, count(DISTINCT pv_users.userid)
GROUP BY pv_users.gender
INSERT OVERWRITE DIRECTORY '/opt/data/users/pv_age_sum'
SELECT pv_users.age, count(DISTINCT pv_users.userid)
GROUP BY pv_users.age;
生成MR Job 个数
生成一个MR Job
多表连接,如果多个表中每个表都使用同一个列进行连接(出现在JOIN子句中),则只会生成一个MR Job。
SELECT a.val, b.val, c.val FROM a
JOIN b ON (a.key = b.key1)
JOIN c ON (c.key = b.key1)
三个表a、b、c都分别使用了同一个字段进行连接,亦即同一个字段同时出现在两个JOIN子句中,从而只生成一个MR Job。
生成多个MR Job
多表连接,如果多表中,其中存在一个表使用了至少2个字段进行连接(同一个表的至少2个列出现在JOIN子句中),则会至少生成2个MR Job。
SELECT a.val, b.val, c.val FROM a
JOIN b ON (a.key = b.key1)
JOIN c ON (c.key = b.key2)
三个表基于2个字段进行连接,这两个字段b.key1和b.key2同时出现在b表中。连接的过程是这样的:首先a和b表基于a.key和b.key1进行连接,对应着第一个MR Job;表a和b连接的结果,再和c进行连接,对应着第二个MR Job。
数据倾斜:
倾斜原因:
map输出数据按key Hash的分配到reduce中,由于key分布不均匀、业务数据本身的特性、建表时考虑不周、某些SQL语句本身就有数据倾斜等原因造成的reduce上的数据量差异过大,所以如何将数据均匀的分配到各个reduce中,就是解决数据倾斜的根本所在。
解决方案:
1. 空值数据倾斜
join的key值发生倾斜,key值包含很多空值或是异常值,这种情况可以对异常值赋一个随机值来分散key。
案例:在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。
select * from log l
left outer join user u on
case when (l.user_id is null or I.user_id='-' or I.user_id='0')
then concat(‘sql_hive’,rand() ) else l.user_id end = u.user_id;
2. Join操作产生数据倾斜
2.1 大表和小表Join
产生原因:Hive在进行join时,按照join的key进行分发,而在join左边的表的数据会首先读入内存,如果左边表的key相对分散,读入内存的数据会比较小,join任务执行会比较快;而如果左边的表key比较集中,而这张表的数据量很大,那么数据倾斜就会比较严重,而如果这张表是小表,则还是应该把这张表放在join左边。
解决方式:使用map join让小的维度表先进内存。在map端完成reduce。
在0.7.0版本之前:需要在sql中使用 /*+ MAPJOIN(smallTable) */ ;
SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a
JOIN b ON a.key = b.key;
在0.7.0版本之后:可以配置hive.auto.convert.join。
配置项 | 缺省值 | 配置说明 |
---|---|---|
hive.auto.convert.join | (0.7.0-0.10.0)false; (0.11.0-)true | 注意:hive-default.xml模板中错误地将默认设置为false,在Hive 0.11.0到0.13.1 |
hive.smalltable.filesize(0.7.0) or hive.mapjoin.smalltable.filesize(0.8.1) | 25000000 | 默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中 |
注意:使用默认启动该优化的方式如果出现默名奇妙的BUG(比如MAPJOIN并不起作用),就将以下两个属性置为fase手动使用MAPJOIN标记来启动该优化
hive.auto.convert.join=false(关闭自动MAPJOIN转换操作)
hive.ignore.mapjoin.hint=false(不忽略MAPJOIN标记)
对于以下查询是不支持使用方法二(MAPJOIN标记)来启动该优化的
select /*+MAPJOIN(smallTableTwo)*/ idOne, idTwo, value FROM
( select /*+MAPJOIN(smallTableOne)*/ idOne, idTwo, value FROM
bigTable JOIN smallTableOne on (bigTable.idOne = smallTableOne.idOne)
) firstjoin
JOIN
smallTableTwo ON (firstjoin.idTwo = smallTableTwo.idTwo)
但是,如果使用的是方法一即没有MAPJOIN标记则以上查询语句将会被作为两个MJ执行,进一步的,如果预先知道表大小是能够被加载进内存的,则可以通过以下属性来将两个MJ合并成一个MJ
hive.auto.convert.join.noconditionaltask:Hive在基于输入文件大小的前提下将普通JOIN转换成MapJoin,
并是否将多个MJ合并成一个
hive.auto.convert.join.noconditionaltask.size:
多个MJ合并成一个MJ时,其表的总的大小须小于该值,同时hive.auto.convert.join.noconditionaltask必须为true
2.2 大表和大表Join
产生原因:业务数据本身的特性,导致两个表都是大表。
解决方式:业务削减。
案例:user 表有 500w+ 的记录,把 user 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。
select * from log l left outer join user u
on l.user_id = u.user_id;
解决方法:当天登陆的用户其实很少,先只查询当天登录的用户,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。
select /*+mapjoin(u2)*/* from log l2
left outer join
(
select /*+mapjoin(l1)*/u1.*
from ( select distinct user_id from log ) l1
join user u1 on l1.user_id = u1.user_id
) u2
on l2.user_id = u2.user_id;
3. count distinct 聚 合 时 存 在 大 量 特 殊 值
产生原因: 做count distinct时,该字段存在大量值为NULL或空的记录。
解决方式: 做count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
案例:
1.只计算count distinct
select cast(count(distinct user_id)+1 as bigint) as user_cnt
from user
where user_id is not null and user_id <> '';
2.计算完count distinct 后面还有 group by。同一个reduce上进行distinct操作时压力很大,先将值为空的记录单独处理,再和其他计算结果进行union。
在Hive中,经常遇到count(distinct)操作,这样会导致最终只有一个reduce,我们可以先group 再在外面包一层count,就可以了。
select day,
count(case when type='session' then 1 else null end) as session_cnt,
count(case when type='user' then 1 else null end) as user_cnt
from (
select day,type
from (
select day,session_id,'session' as type from log
union all
select day user_id,'user' as type from log
)
group by day,type
)t1
group by day;
4. group by 产生倾斜的问题
set hive.map.aggr=true
开启map端combiner:在Map端做combine,若map各条数据基本上不一样, 聚合无意义,通过如下参数设置。
hive.groupby.mapaggr.checkinterval = 100000 (默认)
hive.map.aggr.hash.min.reduction=0.5(默认)
解释:预先取100000条数据聚合,如果聚合后的条数小于100000*0.5,则不再聚合。
set hive.groupby.skewindata=true;//决定 group by 操作是否支持倾斜数据。
注意:只能对单个字段聚合。
控制生成两个MR Job,第一个MR Job Map的输出结果随机分配到reduce中减少某些key值条数过多某些key条数过小造成的数据倾斜问题。
在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可能分发到不同的reduce中,从而达到负载均衡的目的;
第二个 MapReduce 任务再根据预处理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。