本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合!
初始化SparkStreaming
前面我们架构原理上讲到,SparkStreaming依赖于StreamingContext和SparkContext
因此首先是要初始化它们。
- 创建方式一:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
- 创建方式二:
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
Context被创建之后,你就可以通过创建DStream定义输入源,并对DStreams定义一系列计算操作如转化和输出。
StreamingContext API
以下是我们开发spark streaming程序时常用的API方法:
- def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
从TCP Source中创建输入流,必须定义接受文件的主机和端口- def textFileStream(directory: String): DStream[String]
创建输入流,为Hadoop兼容的文件系统监控新文件并读取它们(使用LongWritable作key,Text作为value,输入格式为TextInputFormat)- def start(): Unit
开始流处理- def awaitTermination(): Unit
等待进程停止(当有错误发生时)- def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
停止流,并选择是否立即停止
TCP source实战
Tcp source
- local[2]设置多个线程,在本地运行Spark Streaming程序时,请勿使用“local”或“local [1]”。这两者中的任何一个都意味着只有一个线程将用于本地运行任务。如果您使用的是基于接收器的DStream(例如sockets,Kafka,Flume等),那么一个线程将被用于运行接收器,这样就没有线程来处理接收到的数据了。因此,在本地运行时,请始终使用“local [ n ]”作为主URL,其中n >要运行的接收器的数量。
总结:使用spark-shell运行sparkstreaming程序时,要不线程数大于1,要不基于集群。bin/spark-shell --master local[2]
bin/spark-shell --master spark://master:7077
- Seconds(5)是设置时间间隔,即上篇博客架构上所讲到的,sparkstreaming是分批的,批次之间有时间间隔。
scala> import org.apache.spark._
import org.apache.spark._
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7540160e
scala> val ssc = new StreamingContext(conf, Seconds(5))
scala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@465d1345
scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@7577589
scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@7c2e88b9
scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@ad6448e
scala> wordCounts.print()
scala> ssc.start()
scala> ssc.awaitTermination()
scala> -------------------------------------------
Time: 1519188012000 ms
-------------------------------------------
-------------------------------------------
Time: 1519188013000 ms
-------------------------------------------
-------------------------------------------
当在nc上实时输入数据时:
[root@master ~]# nc -lk 9999
hello spark spatk
hello spark spark spark
SparkStreaming实时进行单词统计
Time: 1519188022000 ms
-------------------------------------------
(hello,1)
(spark,1)
(spatk,1)
-------------------------------------------
Time: 1519188023000 ms
-------------------------------------------
-------------------------------------------
Time: 1519188034000 ms
-------------------------------------------
(hello,1)
(spark,3)
-------------------------------------------
Time: 1519188035000 ms
-------------------------------------------
File source实战
TCP source我们是在Spark-shell中编写,那么File source换做IDEA开发工具编写。通过两种不同方式让大家更深的掌握。
hdfs source.gif
- maven配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.spark2.demo</groupId>
<artifactId>SparkStudy</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
</project>
程序代码HdfsFileInput.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HdfsFileInput {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("HdfsInput")
val ssc = new StreamingContext(conf,Seconds(5))
val lines = ssc.textFileStream("hdfs://master/streaming/hdfs/")
val wordcount = lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
wordcount.print()
ssc.start()
ssc.awaitTermination()
}
}
关于SparkStreaming需要注意的地方
- StreamingContext启动后,增加新的操作将不起作用。也就是说在StreamingContext启动之前,要定义好所有的计算逻辑
- StreamingContext停止后,不能重新启动。也就是说要重新计算的话,需要重新运行整个程序。
- 在单个JVM中,一段时间内不能出现两个active状态的StreamingContext
- 调用StreamingContext的stop方法时,SparkContext也将被stop掉,如果希望StreamingContext关闭时,保留SparkContext,则需要在stop方法中传入参数stopSparkContext=false