一、问题
对实时流量日志过滤筛选商户流量,对每个商户的流量进行累计,统计商户实时累计流量。
当时间超过24时时,重新统计当日商户的实时累计流量。
二、实现步骤
1、采用Spark Streaming读取Kafka中的实时日志流,生成DStream
2、过滤其中的商户页流量,生成DStream[k,v] (注:k为shopid, v为pv)
3、采用Spark Streaming中DStream[k,v]的mapWithState方法生成商户累计流量MapWithStateDStream
4、通过调用StreamingContext中的awaitTerminationOrTimeout(time) 方法设置当前StreamingContext的终止时间实现在每天24时终止所有上述DStream计算。
5、调用StreamingContext中的stop方法,终止StreamingContext。调用stop方法默认会终止SparkContext,设置stop(stopSparkContext:Boolean = false,stopGracefully:Boolean = true)参数,可以实现不终止SparkContext,同时能够保持StreamingContext已经接受的Batch能够处理完成后再终止StreamingContext
6、重复1~5,即可以再次日0时自动生成新的StreamingContext统计当日商户累计流量
三、案例代码
package com.demo.data
import java.util
import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{Logging, SparkConf}
import com.demo.data.kafka.KafkaService
import com.demo.data.util.Constants
/**
* Created by phycsgy on 17/2/13.
*/
object KafkaToRedis extends App with Logging{
val conf = new SparkConf().setAppName("SparkStreamingKafka")
val sc = new SparkContext(conf)
//过滤商户页PV流量
def shopTrafficFilter(log:String):Boolean = {
(log contains "\"element_id\":\"pageview\"") &
(log contains "\"page_name\":\"shopinfo\"") &
("\"shop_id\":\"[0-9]+\"".r findFirstIn log).nonEmpty
}
//正则表达式提取shopid
def shopInfoExtract(log:String) = {
val parttern = "\"shop_id\":\"([0-9]+)\"".r
val matchResult = parttern findFirstMatchIn log
Tuple2(matchResult.get.group(1),1)
}
//计算当前时间距离次日凌晨的时长(毫秒数)
def resetTime = {
val now = new Date()
val tomorrowMidnight = new Date(now.getYear,now.getMonth,now.getDate+1)
tomorrowMidnight.getTime - now.getTime
}
//商户实时流量状态更新函数
val mapFuction = (shopid: String, pv: Option[Int], state: State[Int]) => {
val accuSum = pv.getOrElse(0) + state.getOption().getOrElse(0)
val output = (shopid,accuSum)
state.update(accuSum)
output
}
val stateSpec = StateSpec.function(mapFuction)
while(true){
val ssc = new StreamingContext(sc, Seconds(30))
ssc.checkpoint("./")
val kafkaService = new KafkaService
val topicName = "log.traffic_data"
//从kafka读取日志流
val kafkaStream = kafkaService.getKafkaStream[String, StringDecoder](ssc, topicName, Constants.KAFKA_LARGEST_OFFSET)
//过滤商户页实时流量
val shopTrafficStream = kafkaStream.map(msg => msg._2).filter(shopTrafficFilter).map(shopInfoExtract)
//生成商户页流量实时累计状态
val shopTrafficUpdateStateDStream = shopTrafficStream.mapWithState(stateSpec).stateSnapshots()
//展示商户页实时累计流量TOP10的商户
shopTrafficUpdateStateDStream.foreachRDD{
rdd => {
//取TOP10商户
rdd.top(10)(/*自定义排序方法*/TopElementOrdering)
.foreach(item => println(item))
}
}
ssc.start()
//
ssc.awaitTerminationOrTimeout(resetTime)
ssc.stop(false,true)
}
}