spark从入门到放弃五十一:Spark Streaming(11)window 滑动窗口

文章地址:http://www.haha174.top/article/details/251614

  1. 简介

Spark Streaming 提供了滑动窗口的操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次落在窗口里面的RDD 数据,会被集合起来,然后生成新的RDD 会作为windows DStream 的一个RDD ,例如对每三秒钟执行一次滑动窗口计算。所以每个滑动窗口的操作,都必须只当两个参数,窗口的长度,以及滑动间隔,而且这两个参数都必须是batch 间隔的整数倍

  1. 基本操作

《spark从入门到放弃五十一:Spark Streaming(11)window 滑动窗口》 这里写图片描述

3.案例

热点搜索词滑动统计,每隔10秒种,统计最近60秒钟的搜索词的搜索频次,打印出词频最高的前三个搜索词 一次出现次数
下面给出java 示例和注释:

public class WindowWord {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("TransFormBlackList");
        JavaSparkContext sc=new JavaSparkContext(conf);
        JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(5));
        JavaReceiverInputDStream<String> searchLog=jssc.socketTextStream("www.codeguoj.cn",9999);
        // 将搜索词转换成只有一个搜索词
        JavaDStream<String> searchWordDStream=searchLog.map(new Function<String, String>() {
            @Override
            public String call(String v1) throws Exception {
                return v1.split(" ")[0];
            }
        });
        JavaPairDStream<String,Integer> searchWordsPairsDSTream=searchWordDStream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s,1);
            }
        });
        //  第二个参数窗口长度
        //第三个参数  滑动间隔
        //就是说 每个10秒将最近60秒的数据作为一个窗口
       JavaPairDStream<String,Integer> searchWorldCountDStream= searchWordsPairsDSTream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
           @Override
           public Integer call(Integer v1, Integer v2) throws Exception {
               return v1+v2;
           }
       },Durations.seconds(60),Durations.seconds(10));
       //执行transform  操作  根据搜索词进行排序  然后获取排名前三的搜索词

        JavaPairDStream<String,Integer>  finalRDD=  searchWorldCountDStream.transformToPair(new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
            @Override
            public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> v1) throws Exception {

                JavaPairRDD<Integer,String> countSearchRDD=v1.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                    @Override
                    public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {

                        return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1);
                    }
                });
                //然后进行降序排序
                JavaPairRDD<Integer,String> softedRDD=countSearchRDD.sortByKey(false);
                //再一次进行反转
                JavaPairRDD<String,Integer> softedRDDCount=softedRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                        return new Tuple2<>(integerStringTuple2._2,integerStringTuple2._1);
                    }
                });


             List<Tuple2<String,Integer>> listResult=   softedRDDCount.take(3);
                for(Tuple2<String,Integer> v89:listResult){
                    System.out.println(v89._1+" "  +v89._2);
                }
               return  softedRDDCount;
           }
        });
        finalRDD.print();
        jssc.start();
        jssc.awaitTermination();
        jssc.stop();
        jssc.close();
    }
}
欢迎关注,更多福利
----
![这里写图片描述](http://upload-images.jianshu.io/upload_images/7822142-0684cf17d774d75a.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    原文作者:意浅离殇
    原文地址: https://www.jianshu.com/p/ba8b5ed8c9ef
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞