Java Spark 简单示例(六)Spark Streaming Window

大数据学习交流微信群

前两天分享的Flink 学习笔记中有介绍滚动窗口滑动窗口Spark Streaming也是支持的。
Java Spark 简单示例(五)Spark Streaming 演示了Spark Streaming的常规用法就是滚动窗口。我们设置了批处理的时间长度,Spark 默认每隔一段时间滚动一次窗口,窗口之间不存在重复数据。

//批处理时间,即一个滚动窗口的长度,滚动间隔等于该长度
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));

本篇将结合官方文档基于上一篇demo演示如何实现滑动窗口

官网介绍: window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.其中windowLength表示滑动窗口的长度,slideInterval表示滑动间隔。windowLengthslideInterval 必须是批处理时间的整数倍,即上述定义的3s的整数倍.slideInterval不填默认是批处理时间长度即上述定义的3s.

package com.yzy.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class demo7 {
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String host = "localhost";
    private static int port = 9999;

    public static void main(String[] args) {
        //初始化sparkConf
        SparkConf sparkConf = SparkConfig.getSparkConf().setMaster(master).setAppName(appName);

        //获得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));

        //从socket源获取数据
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

        //拆分行成单词
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        //调用window函数,生成新的DStream,每隔3秒聚合过去6秒内的源数据,滑动间隔不填默认3秒
        //等价于words.window(Durations.seconds(6),Durations.seconds(3));
        JavaDStream<String> newWords = words.window(Durations.seconds(6));

        //计算每个单词出现的个数
        JavaPairDStream<String, Integer> wordCounts = newWords.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        //输出结果
        wordCounts.print();

        //开始作业
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}

输出结果:

-------------------------------------------
Time: 1537934565000 ms
-------------------------------------------
(spark,1)
(1,1)
(test,1)
(streaming,1)

-------------------------------------------
Time: 1537934568000 ms
-------------------------------------------
(spark,4)
(1,1)
(2,1)
(3,1)
(4,1)
(test,4)
(streaming,4)

-------------------------------------------
Time: 1537934571000 ms
-------------------------------------------
(spark,6)
(2,1)
(3,1)
(4,1)
(5,1)
(6,1)
(7,1)
(test,6)
(streaming,6)

-------------------------------------------
Time: 1537934574000 ms
-------------------------------------------
(spark,6)
(10,1)
(5,1)
(6,1)
(7,1)
(8,1)
(9,1)
(test,6)
(streaming,6)

-------------------------------------------
Time: 1537934577000 ms
-------------------------------------------
(spark,6)
(10,1)
(11,1)
(12,1)
(13,1)
(8,1)
(9,1)
(test,6)
(streaming,6)
....

除了调用window()来转化Dstream,还可以直接调用reduceByKeyAndWindow()函数,使聚合函数按照滑动窗口来执行。如下:

//.....省略
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s,1);
            }
        });

JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }, Durations.seconds(6), Durations.seconds(3));//用法相同

windowedWordCounts.print();
    原文作者:憨人Zoe
    原文地址: https://www.jianshu.com/p/ae243f903d42
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞