SparkStreaming入门教程(二)基础输入源:TCP+HDFS 实时读取文件数据并处理

本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合!

初始化SparkStreaming

前面我们架构原理上讲到,SparkStreaming依赖于StreamingContext和SparkContext
因此首先是要初始化它们。

  • 创建方式一:
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
  • 创建方式二:
import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

Context被创建之后,你就可以通过创建DStream定义输入源,并对DStreams定义一系列计算操作如转化和输出。

StreamingContext API

以下是我们开发spark streaming程序时常用的API方法:

  • def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
    从TCP Source中创建输入流,必须定义接受文件的主机和端口
  • def textFileStream(directory: String): DStream[String]
    创建输入流,为Hadoop兼容的文件系统监控新文件并读取它们(使用LongWritable作key,Text作为value,输入格式为TextInputFormat)
  • def start(): Unit
    开始流处理
  • def awaitTermination(): Unit
    等待进程停止(当有错误发生时)
  • def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
    停止流,并选择是否立即停止

TCP source实战

《SparkStreaming入门教程(二)基础输入源:TCP+HDFS 实时读取文件数据并处理》 Tcp source

  • local[2]设置多个线程,在本地运行Spark Streaming程序时,请勿使用“local”或“local [1]”。这两者中的任何一个都意味着只有一个线程将用于本地运行任务。如果您使用的是基于接收器的DStream(例如sockets,Kafka,Flume等),那么一个线程将被用于运行接收器,这样就没有线程来处理接收到的数据了。因此,在本地运行时,请始终使用“local [ n ]”作为主URL,其中n >要运行的接收器的数量。
    总结:使用spark-shell运行sparkstreaming程序时,要不线程数大于1,要不基于集群。bin/spark-shell --master local[2]
    bin/spark-shell --master spark://master:7077
  • Seconds(5)是设置时间间隔,即上篇博客架构上所讲到的,sparkstreaming是分批的,批次之间有时间间隔。
scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7540160e

scala> val ssc = new StreamingContext(conf, Seconds(5))

scala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@465d1345

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@7577589

scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@7c2e88b9

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@ad6448e

scala> wordCounts.print()

scala> ssc.start()
scala> ssc.awaitTermination()
scala> -------------------------------------------
Time: 1519188012000 ms
-------------------------------------------

-------------------------------------------
Time: 1519188013000 ms
-------------------------------------------

-------------------------------------------

当在nc上实时输入数据时:

[root@master ~]# nc -lk 9999
hello spark spatk
hello spark spark spark

SparkStreaming实时进行单词统计

Time: 1519188022000 ms
-------------------------------------------
(hello,1)
(spark,1)
(spatk,1)

-------------------------------------------
Time: 1519188023000 ms
-------------------------------------------

-------------------------------------------

Time: 1519188034000 ms
-------------------------------------------
(hello,1)
(spark,3)

-------------------------------------------
Time: 1519188035000 ms
-------------------------------------------

File source实战

TCP source我们是在Spark-shell中编写,那么File source换做IDEA开发工具编写。通过两种不同方式让大家更深的掌握。

《SparkStreaming入门教程(二)基础输入源:TCP+HDFS 实时读取文件数据并处理》 hdfs source.gif

  • maven配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.spark2.demo</groupId>
    <artifactId>SparkStudy</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
    </dependencies>
</project>

程序代码HdfsFileInput.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HdfsFileInput {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("HdfsInput")
    val ssc = new StreamingContext(conf,Seconds(5))
    val lines = ssc.textFileStream("hdfs://master/streaming/hdfs/")
    val wordcount = lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
    wordcount.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

关于SparkStreaming需要注意的地方

  • StreamingContext启动后,增加新的操作将不起作用。也就是说在StreamingContext启动之前,要定义好所有的计算逻辑
  • StreamingContext停止后,不能重新启动。也就是说要重新计算的话,需要重新运行整个程序。
  • 在单个JVM中,一段时间内不能出现两个active状态的StreamingContext
  • 调用StreamingContext的stop方法时,SparkContext也将被stop掉,如果希望StreamingContext关闭时,保留SparkContext,则需要在stop方法中传入参数stopSparkContext=false
    原文作者:Seven_Ki
    原文地址: https://www.jianshu.com/p/d9c9de5b719f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞