前两天分享的Flink 学习笔记
中有介绍滚动窗口
和滑动窗口
。Spark Streaming
也是支持的。
在 Java Spark 简单示例(五)Spark Streaming 演示了Spark Streaming
的常规用法就是滚动窗口
。我们设置了批处理的时间长度,Spark 默认每隔一段时间滚动一次窗口,窗口之间不存在重复数据。
//批处理时间,即一个滚动窗口的长度,滚动间隔等于该长度
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));
本篇将结合官方文档基于上一篇demo
演示如何实现滑动窗口
官网介绍: window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.其中windowLength表示滑动窗口的长度,slideInterval表示滑动间隔。windowLength 和slideInterval 必须是批处理时间的整数倍,即上述定义的
3s
的整数倍.slideInterval不填默认是批处理时间长度即上述定义的3s
.
package com.yzy.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class demo7 {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = SparkConfig.getSparkConf().setMaster(master).setAppName(appName);
//获得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));
//从socket源获取数据
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
//拆分行成单词
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//调用window函数,生成新的DStream,每隔3秒聚合过去6秒内的源数据,滑动间隔不填默认3秒
//等价于words.window(Durations.seconds(6),Durations.seconds(3));
JavaDStream<String> newWords = words.window(Durations.seconds(6));
//计算每个单词出现的个数
JavaPairDStream<String, Integer> wordCounts = newWords.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
//输出结果
wordCounts.print();
//开始作业
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}
输出结果:
-------------------------------------------
Time: 1537934565000 ms
-------------------------------------------
(spark,1)
(1,1)
(test,1)
(streaming,1)
-------------------------------------------
Time: 1537934568000 ms
-------------------------------------------
(spark,4)
(1,1)
(2,1)
(3,1)
(4,1)
(test,4)
(streaming,4)
-------------------------------------------
Time: 1537934571000 ms
-------------------------------------------
(spark,6)
(2,1)
(3,1)
(4,1)
(5,1)
(6,1)
(7,1)
(test,6)
(streaming,6)
-------------------------------------------
Time: 1537934574000 ms
-------------------------------------------
(spark,6)
(10,1)
(5,1)
(6,1)
(7,1)
(8,1)
(9,1)
(test,6)
(streaming,6)
-------------------------------------------
Time: 1537934577000 ms
-------------------------------------------
(spark,6)
(10,1)
(11,1)
(12,1)
(13,1)
(8,1)
(9,1)
(test,6)
(streaming,6)
....
除了调用window()
来转化Dstream
,还可以直接调用reduceByKeyAndWindow()
函数,使聚合函数按照滑动窗口来执行。如下:
//.....省略
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}, Durations.seconds(6), Durations.seconds(3));//用法相同
windowedWordCounts.print();