9.Spark学习(Python版本):SparkStreaming基本操作

Spark Streaming程序基本步骤
编写Spark Streaming程序的基本步骤是:

1.通过创建输入DStream来定义输入源
2.通过对DStream应用转换操作和输出操作来定义流计算。
3.用streamingContext.start()来开始接收数据和处理流程。
4.通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
5.可以通过streamingContext.stop()来手动结束流计算进程。

创建StreamingContext对象

请登录Linux系统,启动pyspark。进入pyspark以后,就已经获得了一个默认的SparkConext,也就是sc。因此,可以采用如下方式来创建StreamingContext对象:

>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 1)

1表示每隔1秒钟就自动执行一次流计算,这个秒数可以自由设定。
如果是编写一个独立的Spark Streaming程序,而不是在pyspark中运行,则需要通过如下方式创建StreamingContext对象:

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
setAppName(“TestDStream”)– 设置应用程序名称;
setMaster(“local[2]”) – ”local[2]’表示本地模式,启动2个工作线程。
文件流(DStream) – 命令行中监听

Spark支持从兼容HDFS API的文件系统中读取数据,创建数据流。

为了能够演示文件流的创建,我们需要首先创建一个日志目录(/usr/local/spark/python_code/streaming),并在里面放置两个模拟的日志文件。log1.txt输入:

I love Hadoop
I love Spark
Spark is fast

请另外打开一个终端窗口,启动进入pyspark。

>>> from operator import add
>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc,20)
>>> lines = ssc.textFileStream('file:///usr/local/spark/python_code/streaming/logfile')
>>> words = lines.flatMap(lambda line:line.split(' '))
>>> wordCounts = words.map(lambda word:(word,1)).reduceByKey(add)
>>> wordCounts.pprint()
>>> ssc.start()
>>> ssc.awaitTermination()-------------------------------------------
Time: 2018-08-13 20:40:40
-------------------------------------------

>>> ssc.awaitTermination()-------------------------------------------
Time: 2018-08-13 20:41:00
-------------------------------------------

-------------------------------------------
Time: 2018-08-13 20:41:20
-------------------------------------------

-------------------------------------------
Time: 2018-08-13 20:41:40
-------------------------------------------

输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息,下面的ssc.awaitTermination()是无法输入到屏幕上的。

Spark Streaming每隔20秒就监听一次。但是,监听程序只监听”/usr/local/spark/mycode/streaming/logfile”目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件。所以,为了让我们能够看到效果,需要到”/usr/local/spark/mycode/streaming/logfile”目录下再新建一个log3.txt文件。请打开另外一个终端窗口再新建一个log3.txt文件,里面随便输入一些英文单词,保存后再切换回到spark-shell窗口。
现在你会发现屏幕上不断输出新的信息,导致你无法看清。所以必须停止这个监听程序,按键盘Ctrl+D,或者Ctrl+C。
你可以看到屏幕上,在一大堆输出信息中,你可以找到打印出来的单词统计信息。

文件流(DStream) – 编写独立应用程序进行监听

打开一个Linux终端窗口,进入shell命令提示符状态,然后,执行下面命令:

cd /usr/local/spark/mycode/streaming
vim TestStreaming.py

在TestStreaming.py中输入以下代码:

from operator import add
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[1]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 20)
lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x : (x,1)).reduceByKey(add)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

保存成功后,执行python TestStreaming.py

套接字流(DStream)

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。
/usr/local/spark/python_code/streaming目录下新建NetworkWordCount.py,输入如下内容:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

保存后,执行:
sudo nc -lk port #永久监听TCP端口

第二个终端作为监听窗口,执行如下代码:

cd /usr/local/spark/mycode/streaming
python3 NetworkWordCount.py localhost 9999

这样,就可以在nc第一个终端窗口窗口中随意输入一些单词,在监听窗口每隔1秒就会打印出词频统计信息。

《9.Spark学习(Python版本):SparkStreaming基本操作》

RDD队列流(DStream)

我们可以使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream。

打开一个终端,进入Shell命令提示符状态,然后执行下面命令新建代码文件:
/usr/local/spark/mycode/streaming目录下创建TestRDDQueueStream.py
输入以下代码:

import time
 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
 
if __name__ == "__main__":
 
    sc = SparkContext(appName="PythonStreamingQueueStream")
    ssc = StreamingContext(sc, 1)
 
    # Create the queue through which RDDs can be pushed to
    # a QueueInputDStream
    rddQueue = []
    for i in range(5):
        rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
 
    # Create the QueueInputDStream and use it do some processing
    inputStream = ssc.queueStream(rddQueue)
    mappedStream = inputStream.map(lambda x: (x % 10, 1))
    reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
    reducedStream.pprint()
 
    ssc.start()
    time.sleep(6)
    ssc.stop(stopSparkContext=True, stopGraceFully=True)

保存并回到命令行,输入python3 TestRDDQueueStream.py,得到结果如下:

《9.Spark学习(Python版本):SparkStreaming基本操作》

    原文作者:马淑
    原文地址: https://www.jianshu.com/p/66d3914f4cf1
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞