背景
前两天面试中遇到一个比较基础的计算UV & PV 的问题。思路比较简单,最重要的是 手写代码 ,平常我们都是在IDE 中编写代码,手写代码的时候大多是情况下都是使用IDE 的提示,遇到手写的时候,就算这种简单的代码也不一定写得出来。
那天采取的一个思路是:先把思路写出来,然后,时间够再添代码进去。有时候确实一些函数拼不出来什么的,但是思路在,好过白卷。
由上面的背景引出 使用SparkSQL 实现 统计 UV & PV 的问题。
数据如下:格式 ip,请求方式,路径
192.168.0.112,post,/app2/index.html
192.168.2.11,get,/app1/user?id=3
192.168.2.11,post,/app1/submittoder
192.168.0.122,post,/app1/goods
....
需求: 求出每个APP 的访问访问次数(UV)和独立IP 访问次数(PV)
整个过程大概分为4步:
1、先构建SparkSession 入口
//构建 sparksession
val sparkSession: SparkSession = SparkSession.builder().appName("pv_uv").master("local[3]").getOrCreate()
2、读取文件
val lines: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\testData\\log.txt")
3、整理数据并转换为 DataFrame
第一方式:导入隐式转换,使用 toDF 函数把 RDD 转换成 DataFrame
import sparkSession.implicits._
val ipAndAppDF: DataFrame = lines.map(line => {
//切分一行数据
val fileds: Array[String] = line.split(",")
//提取 ip
val ip: String = fileds(0)
//应用
val app: String = fileds(2).split("/")(1)
(ip, app)
}).toDF("ip", "app")
第二种方式:使用 RDD[Row] + schema , 然后转换为 DataFrame
val ipAndAppRDD: RDD[Row] = lines.map(line => {
//切分一行数据
val fileds: Array[String] = line.split(",")
//提取 ip
val ip: String = fileds(0)
//应用
val app: String = fileds(2).split("/")(1)
// 包装成 Row
Row(ip, app)
})
//准备schema 信息
val schema = StructType(List(
StructField("ip", StringType, true),
StructField("app", StringType, true)
))
//将RowRDD 关联 schema
val bdf: DataFrame = sparkSession.createDataFrame(ipAndAppRDD, schema)
4、计算 PV & UV
第一种方式: 使用 SQL 方式,需要先进行注册临时表信息
//执行 sql 计算 uv & pv
val pv_uv: DataFrame = sparkSession.sql(" select app,count(1) as pv,count(distinct ip) as uv from v_log group by app ")
// 打印数据
pv_uv.show()
第二种方式: 使用调用内置函数的方式计算 pv & uv,需要导入函数
// 导入spark.sql 内置函数 ,去重方法 countDistinct 注意使用方法
import org.apache.spark.sql.functions._
val result: DataFrame = ipAndAppDF.groupBy($"app").agg(count("*") as "pv", countDistinct('ip) as "uv")
// 打印数据
result.show()
5、关闭资源
sparkSession.stop()
最终结果:
建议:平常在写代码的时候,尽量能自己把 api 敲全,能不使用提示最好,这样会更能加深印象。遇到变态的手写代码面试题,可能会有帮助
最后上完整代码:
package com.zhouq.spark.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* 面试题 使用sparksql 查UV PV
*
* 数据格式:ip,请求方式,url
*
* 192.168.0.112,post,/app2/index.html
*
*/
object LogUV_PV {
def main(args: Array[String]): Unit = {
/**
* 第一步:创建 SparkSession
*/
//构建 sparksession
val sparkSession: SparkSession = SparkSession.builder().appName("pv_uv").master("local[3]").getOrCreate()
/**
* 第二步:读取文件
*/
val lines: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\testData\\log.txt")
/**
* 第三步:整理数据并转换为 DataFrame
*/
// 第一种方式:使用toDF 函数,需要导入隐式转换,
import sparkSession.implicits._
val ipAndAppDF: DataFrame = lines.map(line => {
//切分一行数据
val fileds: Array[String] = line.split(",")
//提取 ip
val ip: String = fileds(0)
//应用
val app: String = fileds(2).split("/")(1)
(ip, app)
}).toDF("ip", "app")
//第二种方式:使用RDD[Row] + schema ,然后转换为 DataFrame
// val ipAndAppRDD: RDD[Row] = lines.map(line => {
// //切分一行数据
// val fileds: Array[String] = line.split(",")
// //提取 ip
// val ip: String = fileds(0)
// //应用
// val app: String = fileds(2).split("/")(1)
// Row(ip, app)
// })
//
// //准备schema 信息
// val schema = StructType(List(
// StructField("ip", StringType, true),
// StructField("app", StringType, true)
// ))
//
// //将RowRDD 关联 schema
// val bdf: DataFrame = sparkSession.createDataFrame(ipAndAppRDD, schema)
/**
* 第四步: 使用执行 SQL 的方式 计算 pv & uv
*/
//注册临时表
ipAndAppDF.createTempView("v_log")
//执行 sql 计算 uv & pv
val pv_uv: DataFrame = sparkSession.sql(" select app,count(1) as pv,count(distinct ip) as uv from v_log group by app ")
pv_uv.show()
// /**
// * 第四步: 使用调用内置函数的方式计算 pv & uv
// * 需要导入 函数
// */
// import org.apache.spark.sql.functions._
// val result: DataFrame = ipAndAppDF.groupBy($"app").agg(count("*") as "pv", countDistinct('ip) as "uv")
//
// result.show()
/**
* 第五步:关闭资源
*/
sparkSession.stop()
}
}
有兴趣的欢迎关注,大家一起交流学习。