spark streaming 实时日志清洗和统计

1、数据处理加工模型

(1)输入:文件,数据库,消息队列

(2)处理:函数,sql,mapreduce,bolt,transform/action

(3)输出:文件,数据库

2、spark简介

spark与hadoop开发wordcount对比

hadoop

(1)主方法

(2)map

(3)reduce

spark

(1)启动

(2)转换transform

(3)动作action

spark常用函数

转换(transformation)

 转换含义

map(func)返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成

filter(func)返回一个新数据集,由经过func函数计算后返回值为true的输入元素组成

flatMap(func)类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)

union(otherDataset)返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成

distinct([numTasks]))返回一个包含源数据集中所有不重复元素的新数据集

groupByKey([numTasks])在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集

注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的numTasks参数来改变它

reduceByKey(func, [numTasks])在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。类似groupByKey,reduce任务个数是可以通过第二个可选参数来配置的

join(otherDataset, [numTasks])在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集

动作(actions)

 动作含义

reduce(func)通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的被并行执行。

collect()在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作并返回一个足够小的数据子集后再使用会比较有用。

count()返回数据集的元素的个数。

first()返回数据集的第一个元素(类似于take(1))

take(n)返回一个由数据集的前n个元素组成的数组。注意,这个操作目前并非并行执行,而是由驱动程序计算所有的元素

saveAsTextFile(path)将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行

countByKey()对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每一个key对应的元素个数

foreach(func)在数据集的每一个元素上,运行函数func进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase

3、流式处理spark streaming

spark streaming 定时间隔的批处理,与storm的区别

storm启动

spark streaming启动

kafka是个分布式消息队列,与qmq的区别

kafka的topic都要指定分区个数,对应下游的多路并行接收处理

4、qunar实时日志统计流程

以分渠道计算pv/uv为例的流程:

(1)实时接入日志做清洗

(2)解析出渠道名称cid,设备标识uid

(3)按cid汇总,累加pv和uid集合,写入cache(redis)

(4)每日导出cache结果

PS:如果要导出明细,需考虑并发量,一般都是经过筛选后的子集

统计pv/uv的代码示例:

(1)总体和转换

(2)合并动作

(3)汇总动作

    原文作者:空竹翌人
    原文地址: https://www.jianshu.com/p/2a6899e8ac37
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞