本文主要介绍如何通过spark进行pv和uv的计算。一般我们经常会计算pv和uv,那么我们计算pv和uv的时候是不是性能最优的呢?
好,我们开始看例子:
首先看一下数据:
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_cccc","timestamp":1543309410741,"device":null,"ip":null,"bucket":1,"data":{"templateName":"votePost","appType":1,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-lhpa2gPc7QBs","timestamp":1543309410741,"device":null,"ip":null,"bucket":4,"data":{"templateName":"dailySignPush","appType":3,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-eeee","timestamp":1543309410741,"device":null,"ip":null,"bucket":5,"data":{"templateName":"replyPost","appType":2,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-xxxxx","timestamp":1543309410741,"device":null,"ip":null,"bucket":4,"data":{"templateName":"dailySignPush","appType":3,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-pppeeee","timestamp":1543309410741,"device":null,"ip":null,"bucket":4,"data":{"templateName":"dailySignPush","appType":3,"sendNum":1}}
{"flag":"sendTemplateMessage","actionType":"success","from":"sendTemplateMessage","openId":"otU065OELPd_rvm-lhpa2gPc7QBs","timestamp":1543309410741,"device":null,"ip":null,"bucket":4,"data":{"templateName":"dailySignPush","appType":3,"sendNum":1}}
然后我们按行读取数据,读取后,我们需要算出不同bucket中不同openid的sendNum的pv和uv,其中pv为sendNum的总和,uv为不重复的openId数。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, SparkConf
import json
def parseJson(log_line):
json_dic = json.loads(log_line)
print json_dic
return (json_dic["flag"], json_dic["actionType"], json_dic["data"]["appType"], json_dic["data"]["templateName"],
json_dic["bucket"], json_dic["openId"], json_dic["data"]["sendNum"])
def fileterRdd(line):
a, b, c, d, e, f, g = line
if a == 'sendTemplateMessage' and b == 'success':
return True
else:
return False
def splitRdd(line):
a, b, c, d, e, f, g = line
return ((c, d, e, f), g)
def transformRdd(line):
(c, d, e, f), g = line
return ((c, d, e), (f, g, 1))
def caculateRes(line1, line2):
f, g, k = line1
f2, g2, k2 = line2
return (f, g + g2, k + k2)
def main():
logFile = "/user/root/spark/sparkstudy02.txt"
master = 'yarn-client'
appName = 'Simple App spark study02'
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
logData = sc.textFile(logFile)
logStage1 = logData.map(lambda x: parseJson(x))
logStage2 = logStage1.filter(lambda x: fileterRdd(x))
logStage3 = logStage2.map(lambda x: splitRdd(x))
logStage4 = logStage3.reduceByKey(lambda x, y: x + y)
logStage5 = logStage4.map(lambda x: transformRdd(x))
logStage6 = logStage5.reduceByKey(lambda x, y: caculateRes(x, y))
print("============================")
for item in logStage1.collect():
print(item)
print("============================")
for item in logStage2.collect():
print(item)
print("============================")
for item in logStage3.collect():
print(item)
print("============================")
for item in logStage4.collect():
print(item)
print("============================")
for item in logStage5.collect():
print(item)
print("============================")
for item in logStage6.collect():
print(item)
if __name__ == '__main__':
main()
运行结果如下:
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-lhpa2gPc7QBs', 2, 1))
((1, u'votePost', 1), (u'otU065OELPd_cccc', 1, 1))
((2, u'replyPost', 5), (u'otU065OELPd_rvm-eeee', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-pppeeee', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-xxxxx', 1, 1))
============================
((1, u'votePost', 1), (u'otU065OELPd_cccc', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-lhpa2gPc7QBs', 4, 3))
((2, u'replyPost', 5), (u'otU065OELPd_rvm-eeee', 1, 1))
[root@cdh1 demo-simple]#
[root@cdh1 demo-simple]#
[root@cdh1 demo-simple]#
[root@cdh1 demo-simple]# bash sparkstudy02.sh
============================
(u'sendTemplateMessage', u'success', 1, u'votePost', 1, u'otU065OELPd_cccc', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs', 1)
(u'sendTemplateMessage', u'success', 2, u'replyPost', 5, u'otU065OELPd_rvm-eeee', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-xxxxx', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-pppeeee', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs', 1)
============================
(u'sendTemplateMessage', u'success', 1, u'votePost', 1, u'otU065OELPd_cccc', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs', 1)
(u'sendTemplateMessage', u'success', 2, u'replyPost', 5, u'otU065OELPd_rvm-eeee', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-xxxxx', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-pppeeee', 1)
(u'sendTemplateMessage', u'success', 3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs', 1)
============================
((1, u'votePost', 1, u'otU065OELPd_cccc'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs'), 1)
((2, u'replyPost', 5, u'otU065OELPd_rvm-eeee'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-xxxxx'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-pppeeee'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs'), 1)
============================
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-lhpa2gPc7QBs'), 2)
((1, u'votePost', 1, u'otU065OELPd_cccc'), 1)
((2, u'replyPost', 5, u'otU065OELPd_rvm-eeee'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-pppeeee'), 1)
((3, u'dailySignPush', 4, u'otU065OELPd_rvm-xxxxx'), 1)
============================
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-lhpa2gPc7QBs', 2, 1))
((1, u'votePost', 1), (u'otU065OELPd_cccc', 1, 1))
((2, u'replyPost', 5), (u'otU065OELPd_rvm-eeee', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-pppeeee', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-xxxxx', 1, 1))
============================
((1, u'votePost', 1), (u'otU065OELPd_cccc', 1, 1))
((3, u'dailySignPush', 4), (u'otU065OELPd_rvm-lhpa2gPc7QBs', 4, 3))
((2, u'replyPost', 5), (u'otU065OELPd_rvm-eeee', 1, 1))
最后我们进行简要的分析:
注意,还有一种方法是通过groupbykey的方式,同时可以通过distinct()操作过滤掉重复的数据从而实现uv,但是这里没有使用这种方法。
原因有如下两个:
1、reduceByKey相比groupByKey在计算效率上会更好一些。
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.
2、如果采用两次计算,一次使用所有数据,一次使用distinct()的数据,那么最后汇总的时候如何处理呢?
同时,在处理时也会出现同一个rdd使用多次的现象,虽然我们可以使用cache把rdd暂时保存在内存中,但是我们应该尽量去使用能够一次的到pv和uv的方法。