需求:
从App启动日志中读取所需数据,用来得到用户的常用启动时间点,常活动的地理位置,并更新到用户画像表中。
背景:
device_id 设备id
start_time App启动时间
latitude 上传纬度
longitude 上传精度
实现
原始数据
DataFrame数据形式是结构化数据,如下
device_id start_time latitude longitude
0 ad3bb7b86f9ad9ff4786f767094f31f4835cf167 2017-07-26 00:18:55.985 39.69992988280924 116.5083292779666
1 7e98a8401070c6bcb46a9ef32efd0e6a7066a220 2017-07-26 00:04:37.532 39.82559758010746 116.4472136767569
2 865675021801835 2017-07-26 00:19:28.821 0.000000 0.000000
通过
schemeRdd.rdd变成普通rdd
结构变为
[(device_id,start_time,latitude,longitude),(device_id,start_time,latitude,longitude),...]
1. 经纬度处理成geohash6 合并到一起生成的RDD数据结构
rdd.map(lambda x: (x['device_id'], x['start_time'],geohash.encode(x[lat],x[lng]))
得到数据结构是:
[('device_id',start_time,geohash6)), (('device_id',start_time,geohash6))]
2. 拆分开成2个rdd ,分别是时间和GeoHash的,分别处理
timerdd = rdd.map(lambda x: (x[0],x[1]))
timerdd_group=timerdd.groupByKey()
得到数据结构
[('device_id1',['2017-09-01 12:34:00','2017-06-30 01:01:12']),('device_id2',['2017-09-05 11:15:32','2017-07-30 07:01:08'])]
geohashrdd=rdd.map(lambda x: (x[0],x[2]))
geohashrdd_group=geohashrdd.groupByKey()
得到数据结构
[('device_id1',['EXXEQ6','OKWQE9','SDEDD1','SDQGW2']),('device_id2',['LLSJW0','YWQDX7'])]
2.1 得到用0-24来表示的常使用的时间点 , 得到最近使用时间到日期 (注意时间list也许没数据,则设置默认时间)
result_time_rdd= timerdd_group.mapValues(lambda x: use_time(x)) #该函数生成一个元祖 ('often_use_time','last_use_time')
得到结构为
[('device_id1',(12,'2017-09-01')),('device_id2',(9,'2017-06-05')),('device_id3',(0,'1970-01-01'))]
2.2 得到最常去的2个geohash的值
result_geohash_rdd = geohashrdd_group.mapValues(lambda x: activie_geohash(x) )
得到结构为
[('device_id1',('EWEQ1','SDWQ3'),('device_id2',('UYWIE8','']),('device_id3',['','']]
- 根据结果数据结构,组拼更新SQL,更新数据
结果2个rdd
result_time_rdd = [('device_id1',(12,'2017-09-01')),('device_id2',(9,'2017-06-05')),('device_id3',(0,'1970-01-01'))]
result_geohash_rdd = [('device_id1',('EWEQ1','SDWQ3'),('device_id2',('UYWIE8','']),('device_id3',['','']]
通过cogroup作得到
[('device_id1',([12,'2017-09-01'],['EWEQ1','SDWQ3']),('device_id2',['UYWIE8','POKE5']),('device_id3',['MVBD5','ZCXV4']]
更新到表中
UPDATE recommend.user_tag SET open_app_period=x[1],open_app_last=x[2],active_loc_1=x[3],active_loc_2=x[4] WHERE device_id=%s
结束语
全文使用RDD的API来实现,RDD的API十分强大
目前建议使用DataFrame来实现