Java Spark 简单示例(五)Spark Streaming

本篇开始介绍Spark API的核心扩展功能 Sprak Streaming

官方介绍

Spark Streaming 是Spark API核心的扩展,支持实时数据流的可扩展,高吞吐量,容错流处理。数据可以从Kafka,Flume, Kinesis, 或TCP Socket来源获得,并且可以使用与高级别功能表达复杂的算法来处理map,reduce,join和window。最后,处理后的数据可以推送到文件系统,数据库和实时仪表看板。

工作原理

Spark Streaming接收实时输入数据流并将数据分批,然后由Spark引擎处理,以批量生成最终结果流。

《Java Spark 简单示例(五)Spark Streaming》 Streaming-flow

代码示例

我以本地Socket 为数据源实现了一个简单对字符串分割计数的功能。

Maven 中引入

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.1</version>
</dependency>

先实现一个Socket Server。在本地启动一个ServerSocket,端口号设为9999,启动后开始监听客户端连接,一旦连接成功,打印客户端地址,然后向客户端推送一串字符串,时间间隔为1秒钟,循环一百次。

package com.yzy.spark;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

public class SparkSocket {
    static ServerSocket serverSocket = null;
    static PrintWriter pw = null;

    public static void main(String[] args) {
        try {
            serverSocket = new ServerSocket(9999);
            System.out.println("服务启动,等待连接");
            Socket socket = serverSocket.accept();
            System.out.println("连接成功,来自:" + socket.getRemoteSocketAddress());
            pw = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
            int j = 0;
            while (j < 100) {
                j++;
                String str = "spark streaming test " + j;
                pw.println(str);
                pw.flush();
                System.out.println(str);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pw.close();
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

编写Spark Streaming Demo。创建一个socket类型的JavaStreamingContext,它是Spark Streaming
的入口,host指向localhost,端口号为9999

package com.yzy.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class demo7 {
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String host = "localhost";
    private static int port = 9999;

    public static void main(String[] args) {
        //初始化sparkConf
        SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

        //获得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));

        //从socket源获取数据
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

        //拆分行成单词
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        //计算每个单词出现的个数
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        //输出结果
        wordCounts.print();

        //开始作业
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}

先运行SparkSocket,控制台输出

服务启动,等待连接

此时主线程会阻塞,再运行demo,观察SparkSocket控制台变化(如果运行demo报错请参考最底部)

服务启动,等待连接
连接成功,来自:/127.0.0.1:5175
spark streaming test 1
spark streaming test 2
spark streaming test 3
......每隔一秒打印一条

再观察demo控制台

-------------------------------------------
Time: 1530089706000 ms
-------------------------------------------
(spark,1)
(1,1)
(streaming,1)
(test,1)


-------------------------------------------
Time: 1530089709000 ms
-------------------------------------------
(4,1)
(spark,3)
(2,1)
(streaming,3)
(test,3)
(3,1)


-------------------------------------------
Time: 1530089712000 ms
-------------------------------------------
(spark,3)
(5,1)
(6,1)
(streaming,3)
(test,3)
(7,1)
......每隔三秒打印一次聚合信息

我设置了3秒钟聚合一次,Spark 刚启动的第一个三秒钟只接收到一条来自socket服务器的数据,因此每个单词计数只有1。然后socket服务器每隔一秒就会有一条数据发送过来,后面的聚合结果也验证了这一点。

关于DStream

点此查看官方文档
1.DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,即从源接收的输入数据流或通过转换输入流生成的已处理数据流。在内部,DStream由连续的RDD系列表示,这是Spark对不可变的分布式数据集的抽象。DStream中的每个RDD都包含来自特定时间间隔的数据。

《Java Spark 简单示例(五)Spark Streaming》 Streaming-dstream

2.在DStream上应用的任何操作都会转化为对基础RDD的操作。在本例中,flatMap操作将应用于linesDStream中的每个RDD 以生成DStream的 wordsRDD。

《Java Spark 简单示例(五)Spark Streaming》 Streaming-dstream-ops

因此,我们还可以将本例转化为JavaRDD数据集来操作。代码改动如下

//.....
        words.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            public void call(JavaRDD<String> rdd) throws Exception {
                JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {
                    public Tuple2<String, Integer> call(String s) throws Exception {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                });

                pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                    public void call(Tuple2<String, Integer> tuple2) throws Exception {
                        System.out.println(String.format("word:%s,count:%d", tuple2._1(), tuple2._2()));
                    }
                });
            }
        });
//删除wordCounts.print();

控制台输出

前面日志被覆盖了...

word:18,count:1
word:spark,count:3
word:17,count:1
word:16,count:1
word:streaming,count:3
word:test,count:3

关于 StreamingContext

点此查看官方文档
1.要初始化Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口。

 //初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

//获得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));

2.appName参数是你的应用程序在集群UI上显示的名称
3.masterSpark,Mesos或YARN群集URL,或者是以本地模式运行的特殊 local[*] 字符串。详情参见文章 Spark启动时的master参数以及Spark的部署方式
4.JavaStreamingContext对象也可以从现有的JavaSparkContext创建

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(3));

5.要点

  • 一旦上下文启动,就不能建立或添加新的streaming computations
  • 一旦上下文停止,就不能重新启动。
  • 同时只能有一个StreamingContext可以在JVM中处于活动状态。
  • StreamingContext上的stop()也会停止SparkContext。要仅停止StreamingContext,请将可选参数stopSparkContext设为false
  • 只要先前的StreamingContext在创建下一个StreamingContext之前停止(不停止SparkContext),就可以重新使用SparkContext来创建多个StreamingContext

遇到的问题

启动demo报错:org.apache.spark.SparkException: A master URL must be set in your configuration.

解决方式设置编译器的 VM Options=-Dspark.master=local

《Java Spark 简单示例(五)Spark Streaming》 如图所示

    原文作者:憨人Zoe
    原文地址: https://www.jianshu.com/p/c72cc55d7af5
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞