Hive中自定义Map/Reduce示例 In Python

Hive支持自定义map与reduce script。接下来我用一个简单的wordcount例子加以说明。使用Python开发(如果使用Java开发,请看这里)。

开发环境:
python:2.7.5
hive:2.3.0
hadoop:2.8.1

一、map与reduce脚本

map脚本(mapper.py)

#!/usr/bin/python
import sys 
import re
while True:
   line = sys.stdin.readline().strip()
   if not line:
     break
   p = re.compile(r'\W+')
   words=p.split(line)
   #write the tuples to stdout
   for word in words:
     print '%s\t%s' % (word, "1")

reduce脚本(reducer.py)

#!/usr/bin/python
import sys 

# maps words to their counts
word2count = {}

while True:
    line=sys.stdin.readline().strip()
    if not line:
      break
    # parse the input we got from mapper.py
    try:
        word,count= line.split('\t', 1)
    except:
        continue

    # convert count (currently a string) to int
    try:
        count = int(filter(str.isdigit,count))
    except ValueError:
        continue

    try:
        word2count[word] = word2count[word]+count
    except:
        word2count[word] = count

# write the tuples to stdout
# Note: they are unsorted
for word in word2count.keys():
    print '%s\t%s' % ( word, word2count[word] )

注意一点的是,不能使用for line in std.in,因为for是一个字节一个字节的读取,而不是一行一行地读。而且在对map输出的word,count进行拆分时,要注意将拆分的count部分非数字部分去掉,以免count转换成int错误。

二、编写hive hql

drop table if exists raw_lines;

-- create table raw_line, and read all the lines in '/user/inputs', this is the path on your local HDFS
create external table if not exists raw_lines(line string)
ROW FORMAT DELIMITED
stored as textfile
location '/user/inputs';

drop table if exists word_count;

-- create table word_count, this is the output table which will be put in '/user/outputs' as a text file, this is the path on your local HDFS

create external table if not exists word_count(word string, count int)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '\t'
 lines terminated by '\n' STORED AS TEXTFILE LOCATION '/user/outputs/';

-- add the mapper&reducer scripts as resources, please change your/local/path
add file /home/yanggy/mapper.py;
add file /home/yanggy/reducer.py;

from (
        from raw_lines
        map raw_lines.line
        --call the mapper here
        using 'mapper.py'
        as word, count
        cluster by word) map_output
insert overwrite table word_count
reduce map_output.word, map_output.count
--call the reducer here
using 'reducer.py'
as word,count;
点赞