odps是我们自主研发的一套数据处理服务。使用过hive的同学可以很快入门,语法与hive的sql基本一致。hive的原理是通过语法解析器将sql转化为一个个map reduce任务,然后将这些任务并行化,通过分布到多台机器并发执行大大提升海量数据处理的速度。
odps的主页面
生产odps数据权限申请
数据同步
- 处理过程:在进行数据清洗之前,我们需要将MySQL的数据同步到odps,数据处理完成之后需要将数据从odps同步回MySQL。
- 在同步之前,首先需要在odps里建表。
我们的建表ddl语句以及同步脚本都写在 数据开发 里,临时查询 可以用来写我们的测试sql。
数据开发/临时查询.png
我们在指定文件夹(我们的是在 工程效能->作战地图 里)下右键新增资源,资源类型选择ddl,然后开始写建表语句(建表语句类似MySQL的,主要区别是字段类型种类不同,另外一个重要的区别是odps有数据分区的概念,用于按照一定维度对数据进行区分,比如按天分区)。
例如:
drop table if exists `odps_battlemap_pmo_kpi_code`;
CREATE TABLE `odps_battlemap_pmo_kpi_code` (
`id` bigint COMMENT '主键',
`kpi_code` STRING COMMENT '清洗结果编码',
`title` STRING COMMENT '标题',
`summary` STRING COMMENT '数据解读',
`kpi_type` STRING COMMENT '结果展示类型',
`space_biz_id` STRING COMMENT '空间id',
`is_process` int COMMENT '1 清洗;0 没有',
`is_deleted` int COMMENT '1 删除;0 没有',
`create_date` datetime COMMENT '创建日期',
`modify_date` datetime COMMENT '修改日期',
`kpi_date` STRING COMMENT '清洗周期',
`create_user` STRING COMMENT '创建者',
`modify_user` STRING COMMENT '修改者'
) PARTITIONED BY (dt STRING) LIFECYCLE 365;
保存完之后点击提交,然后点击发布。发布会到一个新页面,这里只有拥有运维权限,才可以进行发布。默认我们都只有开发权限。
- 表建完之后就需要将MySQL的数据同步到odps,这里我们可以在一个文件夹下新建一个工作流节点,然后配置同步相关的选项。这里主要需要配置源类型,源库,宿类型,宿库。然后配置字段的对应关系,包括压缩、并发执行机器数、分区配置等。
同步表的配置.png
在调度里,我们需要配置希望的调度方式,比如每天、每周或者每月什么时候执行,使用哪个资源组的机器来执行任务。一般选alipaytech资源组,因为这个比较稳定。
调度配置页面.png
然后就是配置上游和下游节点。
-上游节点是指当前节点依赖的节点。比如你的脚本需要使用另一张表的数据,那那张表就是你的上游节点。
-下游节点是指你的脚本会输出。比如你的脚本会生成一张新表,那么这个新表就是你的下游节点。
完成配置后我们可以点击下图的按钮将数据同步到开发环境:
同步数据至开发环境.png
如果希望同步到生产环境,则使用如下按钮:
同步数据至生产环境.png
在工作流页面里,勾上我的节点,然后选择希望执行的同步脚本,右键进行补数据。
工作流-补数据.png
- 数据回流
数据回流到MySQL与同步数据到MySQL类似,只是源和宿不一样。这里同步到MySQL时,我们可能会加一些前置任务,比如在同步到MySQL之前需要先清空表,这时可以使用如下数据:
回流的数据预处理.png
这里需要注意的是,数据从odps回流到MySQL是需要管仲审批的,因为会直接操作线上MySQL数据,所以操作需要谨慎。
配好数据之后,数据会在指定时间,将生产MySQL的数据同步至生产odps,而开发环境的odps是没有这些同步数据的,需要采用前面提到的右键补数据的方式操作。
不过右键补数据的方式不可以用于数据回流的,因为我们的同步脚本里配置的是生产MySQL的地址,开发odps是无法同步数据到生产MySQL的。
数据清洗
数据清洗是我们使用odps的最主要的原因。利用其分布式任务执行能力,快速对海量数据完成一系列的清洗任务。
odps的sql与传统的sql语法大致类似,有一些小的区别,具体语法可以参考:odps语法。
我们可以使用一系列的sql进行操作,也可以使用存储过程做一些逻辑性的操作。
这里一个注意点是测试时的分区选择。举个例子
DROP TABLE pmo_t_space_dept;
CREATE TABLE IF NOT EXISTS pmo_t_space_dept
AS
SELECT a.biz_id AS space_biz_id
, c.biz_id AS dept_biz_id
, c.dept_name AS dept_name
, 0 AS user_count
FROM odps_battlemap_pmo_space a
LEFT OUTER JOIN odps_battlemap_pmo_relation b
ON a.biz_id = b.source_id
AND b.dt = a.dt
AND b.source_type = 'SPAC'
AND b.dest_type = 'DEPT'
LEFT OUTER JOIN odps_battlemap_pmo_department c
ON b.dest_id = c.biz_id
AND b.dest_type = 'DEPT'
AND c.dt = a.dt
WHERE a.dt = ${datetime};
SELECT * FROM pmo_t_space_dept;
这个例子是用来清洗空间下的部门的数据。这里的倒数第二行a.dt = ${datetime};
是用于指定分区的。
然后我们需要在调度里配置
分区参数配置.png
说明如下:
以sql类型节点为例,代码里${变量名}, 节点配置参数项 变量名=调度参数:
代码:select xxxxxx dt=${datetime}
调度变量赋值:datetime=$bizdate
调度执行的时候,若今日是2013年07月22日,代码里会替换成 dt=20130722
**调度内置参数列表**
$bizdate :业务日期(格式yyyymmdd)
说明:这个参数是用非常广泛,日常调度默认为前一天的日期。
未完待续…