SparkSQL 实现UV & PV计算

背景

前两天面试中遇到一个比较基础的计算UV & PV 的问题。思路比较简单,最重要的是 手写代码 ,平常我们都是在IDE 中编写代码,手写代码的时候大多是情况下都是使用IDE 的提示,遇到手写的时候,就算这种简单的代码也不一定写得出来。

那天采取的一个思路是:先把思路写出来,然后,时间够再添代码进去。有时候确实一些函数拼不出来什么的,但是思路在,好过白卷。

由上面的背景引出 使用SparkSQL 实现 统计 UV & PV 的问题。

数据如下:格式 ip,请求方式,路径

192.168.0.112,post,/app2/index.html
192.168.2.11,get,/app1/user?id=3
192.168.2.11,post,/app1/submittoder
192.168.0.122,post,/app1/goods
....

需求: 求出每个APP 的访问访问次数(UV)和独立IP 访问次数(PV)

整个过程大概分为4步:

1、先构建SparkSession 入口

//构建 sparksession
val sparkSession: SparkSession = SparkSession.builder().appName("pv_uv").master("local[3]").getOrCreate()

2、读取文件

val lines: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\testData\\log.txt")

3、整理数据并转换为 DataFrame

第一方式:导入隐式转换,使用 toDF 函数把 RDD 转换成 DataFrame

import sparkSession.implicits._
val ipAndAppDF: DataFrame = lines.map(line => {
  //切分一行数据
  val fileds: Array[String] = line.split(",")
  //提取 ip
  val ip: String = fileds(0)
  //应用
  val app: String = fileds(2).split("/")(1)
  (ip, app)
}).toDF("ip", "app")

第二种方式:使用 RDD[Row] + schema , 然后转换为 DataFrame

val ipAndAppRDD: RDD[Row] = lines.map(line => {
  //切分一行数据
  val fileds: Array[String] = line.split(",")
  //提取 ip
  val ip: String = fileds(0)
  //应用
  val app: String = fileds(2).split("/")(1)
  // 包装成 Row
  Row(ip, app)
})
//准备schema 信息
val schema = StructType(List(
  StructField("ip", StringType, true),
  StructField("app", StringType, true)
))
//将RowRDD 关联 schema
val bdf: DataFrame = sparkSession.createDataFrame(ipAndAppRDD, schema)

4、计算 PV & UV

第一种方式: 使用 SQL 方式,需要先进行注册临时表信息

//执行 sql 计算 uv & pv
val pv_uv: DataFrame = sparkSession.sql(" select app,count(1) as pv,count(distinct ip) as uv from  v_log group by app ")
// 打印数据
pv_uv.show()

第二种方式: 使用调用内置函数的方式计算 pv & uv,需要导入函数

// 导入spark.sql 内置函数 ,去重方法 countDistinct 注意使用方法
import org.apache.spark.sql.functions._
val result: DataFrame = ipAndAppDF.groupBy($"app").agg(count("*") as "pv", countDistinct('ip) as "uv")
// 打印数据
result.show()

5、关闭资源

sparkSession.stop()

最终结果:
《SparkSQL 实现UV & PV计算》

建议:平常在写代码的时候,尽量能自己把 api 敲全,能不使用提示最好,这样会更能加深印象。遇到变态的手写代码面试题,可能会有帮助

最后上完整代码

package com.zhouq.spark.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
  * 面试题 使用sparksql 查UV PV
  *
  * 数据格式:ip,请求方式,url
  *
  * 192.168.0.112,post,/app2/index.html
  *
  */
object LogUV_PV {

  def main(args: Array[String]): Unit = {

    /**
      * 第一步:创建 SparkSession
      */
    //构建 sparksession
    val sparkSession: SparkSession = SparkSession.builder().appName("pv_uv").master("local[3]").getOrCreate()

    /**
      * 第二步:读取文件
      */
    val lines: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\testData\\log.txt")

    /**
      * 第三步:整理数据并转换为 DataFrame
      */
    // 第一种方式:使用toDF 函数,需要导入隐式转换,
    import sparkSession.implicits._

    val ipAndAppDF: DataFrame = lines.map(line => {
      //切分一行数据
      val fileds: Array[String] = line.split(",")
      //提取 ip
      val ip: String = fileds(0)
      //应用
      val app: String = fileds(2).split("/")(1)
      (ip, app)
    }).toDF("ip", "app")

    //第二种方式:使用RDD[Row] + schema ,然后转换为 DataFrame
//    val ipAndAppRDD: RDD[Row] = lines.map(line => {
//      //切分一行数据
//      val fileds: Array[String] = line.split(",")
//      //提取 ip
//      val ip: String = fileds(0)
//      //应用
//      val app: String = fileds(2).split("/")(1)
//      Row(ip, app)
//    })
//
//    //准备schema 信息
//    val schema = StructType(List(
//      StructField("ip", StringType, true),
//      StructField("app", StringType, true)
//    ))
//
//    //将RowRDD 关联 schema
//    val bdf: DataFrame = sparkSession.createDataFrame(ipAndAppRDD, schema)

    /**
      * 第四步: 使用执行 SQL  的方式 计算 pv & uv
      */

    //注册临时表
    ipAndAppDF.createTempView("v_log")

    //执行 sql 计算 uv & pv
    val pv_uv: DataFrame = sparkSession.sql(" select app,count(1) as pv,count(distinct ip) as uv from  v_log group by app ")
    pv_uv.show()

//    /**
//      * 第四步: 使用调用内置函数的方式计算 pv & uv
//      * 需要导入 函数
//      */
//    import org.apache.spark.sql.functions._
//    val result: DataFrame = ipAndAppDF.groupBy($"app").agg(count("*") as "pv", countDistinct('ip) as "uv")
//
//    result.show()

    /**
      * 第五步:关闭资源
      */
    sparkSession.stop()
  }

}

有兴趣的欢迎关注,大家一起交流学习。
《SparkSQL 实现UV & PV计算》

    原文作者:hellozhouq
    原文地址: https://blog.csdn.net/zhouqiaolovemiao/article/details/88608905
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞