动态分区插入
前面的示例中,用户必须知道对哪个分区插入数据,并且一条insert
语句只能插入一个分区。如果想要加载到多个分区,需要使用多条insert
语句,如下:
FROM page_view_stg pvs
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='US')
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'US'
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='CA')
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'CA'
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='UK')
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'UK';
为了将数据加载到特定日期的所有国家分区中,需要为每个国家写insert
语句。这样很不方便,你得预先知道国家列表并且创建好分区。如果有一天国家列表改变了,你还得修改insert
语句同时得创建对应的分区。这样做效率还低,每个insert
语句都会变成MapReduce作业。
动态分区插入(或者多分区插入)就是为解决上述问题设计的,动态分区插入会在扫描输入表时动态决定应该创建和填充哪些分区。这是新加的特性,从0.6.0版本可用。使用动态分区插入,会对输入列的值进行评估,然后决定每行数据应该插入哪个分区。如果分区没有创建,会自动创建分区。使用这个特性只需要写一条insert
语句。另外,因为只有一条insert
语句,也只有一个对应的MapReduce作业。相比多条inser
的情况,这个特性会显著提升性能,减少Hadoop集群的工作负载。
下面是一个加载所有国家分区数据的示例,只用到一条insert
语句:
FROM page_view_stg pvs
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country)
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip, pvs.country
动态分区插入和多条insert
语句有一些语法差异:
- 在
PARTITION
中添加了country,但是没有相关值。在这种情况下,country就是动态分区列。dt有值,它是静态分区列。如果是动态分区列,它的值将会来自输入列。目前只允许动态分区列是最后一列,因为分区列的顺序说明了其层次顺序(意味着dt是root分区,country是child分区),不能写成(dt, country=’US’)。 - 在
select
语句中添加了pvs.country
列。这是动态分区列对应的输入列。注意,不需要给静态分区列添加输入列,因为PARTITION
语句知道它的值。动态分区值是按照顺序查询的,不是按照名称,并且是select
语句的最后一列。
自定义Map/Reduce脚本
用户可以加入已定义的Map/Reduce,这个特性是Hive语言原生支持的。例如,想要运行自动的脚本map_script和reduce_script,用户可以执行以下命令,使用TRANSFORM
语句嵌入脚本。
注意,在传给脚本之前,列会被转换为字符串并以TAB分隔,用户脚本的标准输出会被当做以TAB分隔的字符串列。用户脚本的标准错误可以在Hadoop的任务详情页面看到。
FROM (
FROM pv_users
MAP pv_users.userid, pv_users.date
USING 'map_script'
AS dt, uid
CLUSTER BY dt) map_output
INSERT OVERWRITE TABLE pv_users_reduced
REDUCE map_output.dt, map_output.uid
USING 'reduce_script'
AS date, count;
示例map脚本(weekday_mapper.py)
import sys
import datetime
for line in sys.stdin:
line = line.strip()
userid, unixtime = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print ','.join([userid, str(weekday)])
当然,MAP和REDUCE都是语法糖。内部查询可以写为:
SELECT TRANSFORM(pv_users.userid, pv_users.date) USING 'map_script' AS dt, uid CLUSTER BY dt FROM pv_users;
无模式map/reduce:如果”USING map_script”后没有”AS”语句,Hive假定脚本的输出包含2个部分:key,放在tab之前,value,除key之外其余都放在tab之后。这和指定了”AS key, value”是有区别的,因为如果有多个tab的话,value只会包含了第一个tab和第二个tab之间的部分。
用这种方式,用户可以在不知道map输出模式的情况下迁移老的map/reduce脚本。不过用户还是需要知道reduce输出模式,因为必须与我们要插入的表进行匹配。通常分区列是排序列的前缀,但并不强制。
FROM (
FROM pv_users
MAP pv_users.userid, pv_users.date
USING 'map_script'
CLUSTER BY key) map_output
INSERT OVERWRITE TABLE pv_users_reduced
REDUCE map_output.dt, map_output.uid
USING 'reduce_script'
AS date, count;
DISTRIBUTE BY和SORT BY:不指定”cluster by”,用户可以指定”distribute by”和”sort by”,分区列和排序列可以不同。
FROM pv_users
MAP pv_users.userid, pv_users.date
USING 'map_script'
AS c1, c2, c3
DISTRIBUTE BY c2
SORT BY c2, c1) map_output
INSERT OVERWRITE TABLE pv_users_reduced
REDUCE map_output.c1, map_output.c2, map_output.c3
USING 'reduce_script'
AS date, count;
Co-Groups
假设我们要用uid列组合actions_video和action_comments表的行,并且将数据发送到自定义的’reduce_script’,可以使用如下方式:
FROM (
FROM (
FROM action_video av
SELECT av.uid AS uid, av.id AS id, av.date AS date
UNION ALL
FROM action_comment ac
SELECT ac.uid AS uid, ac.id AS id, ac.date AS date
) union_actions
SELECT union_actions.uid, union_actions.id, union_actions.date
CLUSTER BY union_actions.uid) map
INSERT OVERWRITE TABLE actions_reduced
SELECT TRANSFORM(map.uid, map.id, map.date) USING 'reduce_script' AS (uid, id, reduced_val);