hive增量抽取方案

一、使用sqoop从mysql中抽取数据到hive,查看sqoop官方文档,有如下两种方案:
7.2.9. Incremental Imports
Sqoop provides an incremental import mode which can be used to retrieve only rows newer than some previously-imported set of rows.

The following arguments control incremental imports:

Table 5. Incremental import arguments:

Argument    Description
--check-column (col)    Specifies the column to be examined when determining which rows to import. (the column should not be of type CHAR/NCHAR/VARCHAR/VARNCHAR/ LONGVARCHAR/LONGNVARCHAR)
--incremental (mode)    Specifies how Sqoop determines which rows are new. Legal values for mode include append and lastmodified.
--last-value (value)    Specifies the maximum value of the check column from the previous import.

Sqoop supports two types of incremental imports: append and lastmodified. You can use the --incremental argument to specify the type of incremental import to perform.

You should specify append mode when importing a table where new rows are continually being added with increasing row id values. You specify the column containing the row’s id with --check-column. Sqoop imports rows where the check column has a value greater than the one specified with --last-value.

An alternate table update strategy supported by Sqoop is called lastmodified mode. You should use this when rows of the source table may be updated, and each such update will set the value of a last-modified column to the current timestamp. Rows where the check column holds a timestamp more recent than the timestamp specified with --last-value are imported.

At the end of an incremental import, the value which should be specified as --last-value for a subsequent import is printed to the screen. When running a subsequent import, you should specify --last-value in this way to ensure you import only the new or updated data. This is handled automatically by creating an incremental import as a saved job, which is the preferred mechanism for performing a recurring incremental import. See the section on saved jobs later in this document for more information.

示例:

简单说,sqoop支持两种增量MySql导入到hive的模式,
一种是 append,即通过指定一个递增的列,比如:
–incremental append –check-column id –last-value 0
导入id>0的数
另一种是可以根据时间戳的模式叫 lastmodified ,比如:
–incremental lastmodified –check-column createTime –last-value ‘2012-02-01 11:0:00’
就是只导入createTime 比’2012-02-01 11:0:00’更大的数据。

--check-column (col)        --检查的列
--incremental (mode)        --所选模式,append或者lastmodified 
--last-value (value)          -- 最后一次的值
```
本次采用的是时间戳方案,每天导入全量数据,在hive中抽取最新的数据



#####二、hive增量SQL实现。

数据:data.txt
```
1,mary,18,2017-06-26 10:00:00
2,lucy,29,2017-06-26 10:00:00
3,jack,18,2017-06-26 10:00:00
4,nick,25,2017-06-26 10:00:00
4,nick,18,2017-06-27 10:00:00
5,tom,26,2017-06-26 10:00:00
5,tom,26,2017-06-27 12:00:00
```
1. 建表语句:
```
create table mytable(id int,name string,age int,createTime string) partitioned by (dt string) row format delimited fields terminated by ',';
```

2. 导入数据:
```
load data local inpath '/home/ubuntu/data.txt' into table mytable partition(dt='20170626');
```

3. 查看数据
```
hive> select * from mytable where dt='20170626';
OK
1   mary    18  2017-06-26 10:00:00 20170626
2   lucy    29  2017-06-26 10:00:00 20170626
3   jack    18  2017-06-26 10:00:00 20170626
4   nick    25  2017-06-26 10:00:00 20170626
4   nick    18  2017-06-27 10:00:00 20170626
5   tom 26  2017-06-26 10:00:00 20170626
5   tom 26  2017-06-27 12:00:00 20170626
Time taken: 0.364 seconds, Fetched: 7 row(s)
hive> 
```
4. 我们发现20170626中有27号的增量数据,所以应该将数据更新到20160627分区,保留最新的数据
(注:hive中删除分区方法:alter table mytable drop partition(dt='20170627'))
#####查询语句如下:
```
SELECT id, name, age, createTime
FROM (SELECT id, name, age, createTime, row_number() OVER (PARTITION BY id ORDER BY createTime DESC) AS rn
    FROM mytable
    ) t
WHERE t.rn = 1;
```
其中:
```
select id,name,age,createTime,row_number() over (partition by id order by createTime DESC) AS rn from mytable
```
使用的hive的窗口函数row_number(),该函数作用是将原表按partition后面的字段分区后,并且按照createTime字段降序排列后,对分组内部的行记录进行标记行号,分别从1-n顺序标号,
该句的查询结果如下:
```
Total MapReduce CPU Time Spent: 2 seconds 250 msec
OK
1   mary    18  2017-06-26 10:00:00 1
2   lucy    29  2017-06-26 10:00:00 1
3   jack    18  2017-06-26 10:00:00 1
4   nick    18  2017-06-27 10:00:00 1
4   nick    25  2017-06-26 10:00:00 2
5   tom 26  2017-06-27 12:00:00 1
5   tom 26  2017-06-26 10:00:00 2
Time taken: 24.823 seconds, Fetched: 7 row(s)
hive> 
```

因此我们很容易得出20170627号有效的最新数据为行号rn为1的数据
#####三、更新数据
最后将数据更新到20170627分区,SQL如下
```
INSERT INTO TABLE mytable PARTITION(dt='20170627') 
SELECT id, name, age, createTime
FROM (SELECT id, name, age, createTime, row_number() OVER (PARTITION BY id ORDER BY createTime DESC) AS rn
    FROM mytable
    ) t
WHERE t.rn = 1;
```
查看数据
```
    > select * from mytable where dt='20170627';
OK
1   mary    18  2017-06-26 10:00:00 20170627
2   lucy    29  2017-06-26 10:00:00 20170627
3   jack    18  2017-06-26 10:00:00 20170627
4   nick    18  2017-06-27 10:00:00 20170627
5   tom 26  2017-06-27 12:00:00 20170627
Time taken: 0.121 seconds, Fetched: 5 row(s)
hive> 

```
对比后发现,数据确实是最新的。




    原文作者:nicklbx
    原文地址: https://www.jianshu.com/p/d70a813c876a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞