本文通过一个例子简单介绍下spark的rdd的数据处理。
这是一个网站文章的阅读日志,存放在hdfs://tmp/log/fileread.log中。
时间、语言、标题、次数、总字数
20090505-000000 cn ?71G4Bo1cAdWyg 1 14463
20090505-000000 en Special:Statistics 1 840
20090505-000000 in Special:Whatlinkshere/MediaWiki:Returnto 1 1019
20090505-000000 nr Wikibooks:About 1 15719
20090505-000000 aa ?14mFX1ildVnBc 1 13205
20090505-000000 ab ?53A%2FuYP3FfnKM 1 13207
20090505-000000 en ?93HqrnFc%2EiqRU 1 13199
20090506-000000 dm ?95iZ%2Fjuimv31g 1 13201
20090506-000000 en File:Wikinews-logo.svg 1 8357
20090506-000000 jp Main_Page 2 9980
现在我们的目标是统计日志中语言为“en”的文章每天的阅读次数。结果形式为一个数组[(“20090505”, 2),(“20090506”, 1)…..]。
1、读取日志
val logs=sc.textFile("/tmp/log/fileread.log")
logs就是指向该文件的rdd对象,可以通过logs.take(5).foreach(println)将前5行打印出来。
2、过滤语言类型
把日志中语言为en的记录过滤出来。
val enLogs=logs.filter(_.split(" ")(1) == "en").cache
此处使用cache方法把结果缓存到内存中,加速后面的计算。
3、提取日期、次数
从enLogs记录中提取日期、次数,生成新的rdd。
val enDateReads=enLogs.map((line=>line.split(" ")(0).substring(0,8), line.split(" ")(3).toInt))
4、汇总统计
把日期相同的记录次数进行累计
val result=enDateReads.reduceByKey(_+_, 1).collect
collect方法可以把rdd变为数组,这样我们就得到结果啦。