spark-streaming中使用spark-sql做关联查询

实现:

首先基于topic,创建出kafka的DStream流

val sparkConf = new SparkConf().setAppName(appParams.appName)

val sc = new SparkContext(sparkConf)

val streamingContext = new StreamingContext(sc, Seconds(appParams.batchProcInterval))

val kafkaParams = Map[String, String](“metadata.broker.list” -> appParams.brokers)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, Set[String](appParams.messageInTopic))

创建时间窗:

val windows = messages.map(_._2).window(Seconds(appParams.windownTime), Seconds(appParams.windownTime))

针对每个时间窗做计算

windows.foreachRDD { rdd =>

……

}

每个时间窗内部的处理:

创建case class

case class Record(channelid: String, starttime: Long)

创建sqlContext

val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)

import sqlContext.implicits._

将当前消息流转换成DataFrame:

val df = rdd.map(_.split(“\\|”)).map(line => Record(line(5), line(2).toLong)).toDF()

注册成一张表:

df.registerTempTable(“UserPlayChannel”)

读取parquet数据,注册成另一张表:

val paraquetFile = sqlContext.read.parquet(filePath)

paraquetFile.registerTempTable(“ProgramMetaData”)

现在有了两张表,关联查询只需要写好sql语句就可以了,样例:

select b.programid , count(b.programid) as count from UserPlayChannel a join ProgramMetaData b on a.channelid = b.channelid and a.starttime >= b.starttime and a.starttime <= b.endtime group by b.programid order by count DESC

代码执行:

val hotProgramList = sqlContext.sql(“select b.programid , count(b.programid) as count from UserPlayChannel a join ProgramMetaData b on a.channelid = b.channelid and a.starttime >= b.starttime and a.starttime <= b.endtime group by b.programid order by count DESC”)

现在hotProgramList就是关联查询出的结果。

    原文作者:lsnl8480
    原文地址: https://www.jianshu.com/p/b490672dac11
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞