转: Spark Streaming开发入门——WordCount(Java&Scala)

原文地址:

http://lib.csdn.net/article/scala/26722

 

一、Java方式开发

1、开发前准备

假定您以搭建好了Spark集群。

2、开发环境采用eclipse maven工程,需要添加Spark Streaming依赖。

    <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.0</version> </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

3、Spark streaming 基于Spark Core进行计算,需要注意事项:

1.local模式的话,local后必须为大于等于2的数字【即至少2条线程】。因为receiver 占了一个。 
因为,SparkStreaming 在运行的时候,至少需要一条线程不断循环接收数据,而且至少有一条线程处理接收的数据。 
如果只有一条线程的话,接受的数据不能被处理。

温馨提示:

对于集群而言,每隔exccutor一般肯定不只一个Thread,那对于处理Spark Streaming应用程序而言,每个executor一般分配多少core比较合适?根据我们过去的经验,5个左右的core是最佳的(段子:分配为奇数个core的表现最佳,例如:分配3个、5个、7个core等)

接下来,让我们开始动手写写Java代码吧!

第一步:

SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("wordCountOnline");
  • 1

第二步:

我们采用基于配置文件的方式创建SparkStreamingContext对象

    /** * 第二步:创建SparkStreamingContext,这个是SparkStreaming应用程序所有功能的起始点和程序调度的核心 * SparkStreamingContext的构建可以基于SparkConf参数,也可基于持久化的SparkStreamingContext的内容来回复过来 * (典型的场景是Driver崩溃后重新启动,由于Spark Streaming具有连续7*24小时不间断运行的特征, * 所有需要再Drver重新启动后继续上的状态,此时的状态恢复需要基于曾经的Checkpoint) * * * 2.在一个SparkStreaming应用程序中可以有多个SparkStreamingContext对象,使用下一个SparkStreaming程序之前 * 需要把前面正在运行的SparkStreamingContext对象关闭掉,由此,我们获得一个启发: * * SparkStreaming框架也 就是Spark Core上的一个应用程序。 * 只不过需要Spark工程师写业务逻辑代码。 * 所以要掌握好Spark,学好SparkStreaming 就行了。 * * java虚拟机在os角度上看就是一个简单的c。c++应用程序。 * * 这里可以用工厂方法创建ssc * * */ JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

第三步,创建Spark Streaming输入数据来源:

我们将数据来源配置为本地端口9999(注意端口要求没有被占用):

    /** * 第三步:创建Spark Streaming 输入数据来源:input Stream * 1.数据输入来源可以基于File、HDFS、Flume、Kafka、Socket * * 2.在这里我们指定数据来源于网络Socket端口, * Spark Streaming连接上该端口并在运行的时候一直监听该端口的数据(当然该端口服务首先必须存在,并且在后续会根据业务需要不断的有数据产生)。 * * 有数据和没有数据 在处理逻辑上没有影响(在程序看来都是数据) * * 3.如果经常在每间隔5秒钟 没有数据的话,不断的启动空的job其实是会造成调度资源的浪费。因为 并没有数据需要发生计算。 * 所以企业级生产环境的代码在具体提交Job前会判断是否有数据,如果没有的话,就不再提交Job; * * */ JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

第四步:我们就像对RDD编程一样,基于DStream进行编程,原因是DStream是RDD产生的模板,在Spark Streaming发生计算前,其实质是把每个Batch的DStream的操作翻译成为了RDD操作。DStream对RDD进行了一次抽象。如同DataFrame对RDD进行一次抽象。JavaRDD —-> JavaDStream

1、flatMap操作:

    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String line) throws Exception { // TODO Auto-generated method stub return Arrays.asList(line.split(" ")); } });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2、 mapToPair操作:

    JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String,Integer>(word,1); } });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、reduceByKey操作:

    JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { // TODO Auto-generated method stub return v1 + v2; } });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4、print等操作:

    /** * 此处的print并不会直接触发Job的执行,因为现在的一切都是在Spark Streaming框架的控制之下的。 * 具体是否真正触发Job的运行是基于设置的Duration时间间隔的触发的。 * * * Spark应用程序要想执行具体的Job对DStream就必须有output Stream操作。 * ouput Stream 有很多类型的函数触发,类print、savaAsTextFile、saveAsHadoopFiles等,最为重要的是foreachRDD,因为Spark Streamimg处理的结果一般都会 * 放在Redis、DB、DashBoard等上面,foreachRDD主要 * 就是用来 完成这些功能的,而且可以随意的自定义具体数据放在哪里。 * */ wordsCount.print(); jsc.start(); jsc.awaitTermination(); jsc.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
温馨提示:

除了print()方法将处理后的数据输出之外,还有其他的方法也非常重要,在开发中需要重点掌握,比如SaveAsTextFile,SaveAsHadoopFile等,最为重要的是foreachRDD方法,这个方法可以将数据写入Redis,DB,DashBoard等,甚至可以随意的定义数据放在哪里,功能非常强大。

二、Scala方式开发

import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel /** * @author lxh */ object WordCountOnline { def main(args: Array[String]): Unit = { /** * 创建SparkConf */ val conf = new SparkConf().setMaster("spark://Master:7077").setAppName("wordcountonline") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream("Master", 9999, StorageLevel.MEMORY_AND_DISK) /** * 按空格分割 */ val words = lines.flatMap { line => line.split(" ") } /** * 把单个的word变成tuple */ val wordCount = words.map { word => (word,1) } /** * (key1,1) (key1,1) * key相同的累加。 */ wordCount.reduceByKey(_+_) wordCount.print() ssc.start() /** * 等待程序结束 */ ssc.awaitTermination() ssc.stop(true) } }
    原文作者:晓之朱雀
    原文地址: https://www.cnblogs.com/xzzq/p/7240736.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞