通过例子学习spark rdd

本文通过一个例子简单介绍下spark的rdd的数据处理。

这是一个网站文章的阅读日志,存放在hdfs://tmp/log/fileread.log中。

时间、语言、标题、次数、总字数

20090505-000000 cn ?71G4Bo1cAdWyg 1 14463
20090505-000000 en Special:Statistics 1 840
20090505-000000 in Special:Whatlinkshere/MediaWiki:Returnto 1 1019
20090505-000000 nr Wikibooks:About 1 15719
20090505-000000 aa ?14mFX1ildVnBc 1 13205
20090505-000000 ab ?53A%2FuYP3FfnKM 1 13207
20090505-000000 en ?93HqrnFc%2EiqRU 1 13199
20090506-000000 dm ?95iZ%2Fjuimv31g 1 13201
20090506-000000 en File:Wikinews-logo.svg 1 8357
20090506-000000 jp Main_Page 2 9980

现在我们的目标是统计日志中语言为“en”的文章每天的阅读次数。结果形式为一个数组[(“20090505”, 2),(“20090506”, 1)…..]。

1、读取日志

val logs=sc.textFile("/tmp/log/fileread.log")

logs就是指向该文件的rdd对象,可以通过logs.take(5).foreach(println)将前5行打印出来。

2、过滤语言类型

把日志中语言为en的记录过滤出来。

val enLogs=logs.filter(_.split(" ")(1) == "en").cache

此处使用cache方法把结果缓存到内存中,加速后面的计算。

3、提取日期、次数

从enLogs记录中提取日期、次数,生成新的rdd。

val enDateReads=enLogs.map((line=>line.split(" ")(0).substring(0,8), line.split(" ")(3).toInt))

4、汇总统计

把日期相同的记录次数进行累计

val result=enDateReads.reduceByKey(_+_, 1).collect

collect方法可以把rdd变为数组,这样我们就得到结果啦。

    原文作者:匠人的OP
    原文地址: https://zhuanlan.zhihu.com/p/29527475
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞