Spark Streaming 实时统计商户当日累计PV流量

一、问题

对实时流量日志过滤筛选商户流量,对每个商户的流量进行累计,统计商户实时累计流量。

当时间超过24时时,重新统计当日商户的实时累计流量。

二、实现步骤

1、采用Spark Streaming读取Kafka中的实时日志流,生成DStream

2、过滤其中的商户页流量,生成DStream[k,v] (注:k为shopid, v为pv)

3、采用Spark Streaming中DStream[k,v]的mapWithState方法生成商户累计流量MapWithStateDStream

4、通过调用StreamingContext中的awaitTerminationOrTimeout(time) 方法设置当前StreamingContext的终止时间实现在每天24时终止所有上述DStream计算。

5、调用StreamingContext中的stop方法,终止StreamingContext。调用stop方法默认会终止SparkContext,设置stop(stopSparkContext:Boolean = false,stopGracefully:Boolean = true)参数,可以实现不终止SparkContext,同时能够保持StreamingContext已经接受的Batch能够处理完成后再终止StreamingContext

6、重复1~5,即可以再次日0时自动生成新的StreamingContext统计当日商户累计流量

三、案例代码

package com.demo.data

import java.util
import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{Logging, SparkConf}
import com.demo.data.kafka.KafkaService
import com.demo.data.util.Constants

/**
  * Created by phycsgy on 17/2/13.
  */

object KafkaToRedis extends App with Logging{

    val conf = new SparkConf().setAppName("SparkStreamingKafka")
    val sc = new SparkContext(conf)

    //过滤商户页PV流量
    def shopTrafficFilter(log:String):Boolean = {
        (log contains "\"element_id\":\"pageview\"") &
        (log contains "\"page_name\":\"shopinfo\"") &
        ("\"shop_id\":\"[0-9]+\"".r findFirstIn log).nonEmpty
    }

    //正则表达式提取shopid
    def shopInfoExtract(log:String) = {
      val parttern = "\"shop_id\":\"([0-9]+)\"".r
      val matchResult = parttern findFirstMatchIn log
      Tuple2(matchResult.get.group(1),1)
    }

    //计算当前时间距离次日凌晨的时长(毫秒数)
    def resetTime = {
      val now = new Date()
      val tomorrowMidnight = new Date(now.getYear,now.getMonth,now.getDate+1)
      tomorrowMidnight.getTime - now.getTime

    }

    //商户实时流量状态更新函数
    val mapFuction = (shopid: String, pv: Option[Int], state: State[Int]) => {
      val accuSum = pv.getOrElse(0) + state.getOption().getOrElse(0)
      val output = (shopid,accuSum)
      state.update(accuSum)
      output
    }

    val stateSpec = StateSpec.function(mapFuction)

    while(true){

      val ssc = new StreamingContext(sc, Seconds(30))
      ssc.checkpoint("./")
      val kafkaService = new KafkaService
      val topicName = "log.traffic_data"
      //从kafka读取日志流
      val kafkaStream = kafkaService.getKafkaStream[String, StringDecoder](ssc, topicName, Constants.KAFKA_LARGEST_OFFSET)
      //过滤商户页实时流量
      val shopTrafficStream = kafkaStream.map(msg => msg._2).filter(shopTrafficFilter).map(shopInfoExtract)
      //生成商户页流量实时累计状态
      val shopTrafficUpdateStateDStream = shopTrafficStream.mapWithState(stateSpec).stateSnapshots()
      //展示商户页实时累计流量TOP10的商户
      shopTrafficUpdateStateDStream.foreachRDD{
        rdd => {
          //取TOP10商户
          rdd.top(10)(/*自定义排序方法*/TopElementOrdering)
            .foreach(item => println(item))
        }
      }

      ssc.start()
      //
      ssc.awaitTerminationOrTimeout(resetTime)
      ssc.stop(false,true)

    }

}

    原文作者:phycsgy
    原文地址: https://www.jianshu.com/p/0a26be8f79bf
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞