spark

 

 

 Spark 基础

Spark入门

 

1. 课程目标……………………………………………………………………………………………………… 2

1.1. 目标1:熟悉Spark相关概念………………………………………………………………….. 2

1.2. 目标2:搭建Spark集群………………………………………………………………………… 2

1.3. 目标3:编写简单的Spark应用程序………………………………………………………… 2

2. Spark概述…………………………………………………………………………………………………….. 2

2.1. 什么是Spark(官网:http://spark.apache.org)…………………………………………. 2

2.2. 为什么要学Spark………………………………………………………………………………….. 2

2.3. Spark特点…………………………………………………………………………………………….. 3

2.3.1. 快……………………………………………………………………………………………… 3

2.3.2. 易用………………………………………………………………………………………….. 3

2.3.3. 通用………………………………………………………………………………………….. 4

2.3.4. 兼容性……………………………………………………………………………………….. 4

3. Spark集群安装………………………………………………………………………………………………. 4

3.1. 安装…………………………………………………………………………………………………… 4

3.1.1. 机器部署……………………………………………………………………………………. 4

3.1.2. 下载Spark安装包………………………………………………………………………… 5

3.1.3. 配置Spark…………………………………………………………………………………… 5

4. 执行Spark程序…………………………………………………………………………………………….. 6

4.1. 执行第一个spark程序…………………………………………………………………………… 6

4.2. 启动Spark Shell…………………………………………………………………………………….. 7

4.2.1. 启动spark shell……………………………………………………………………………. 7

4.2.2. 在spark shell中编写WordCount程序………………………………………………. 7

4.3. 在IDEA中编写WordCount程序………………………………………………………………. 8

 

1.  课程目标

1.1. 目标1:熟悉Spark相关概念

1.2. 目标2:搭建Spark集群

1.3. 目标3:编写简单的Spark应用程序

2.  Spark概述

2.1. 什么是Spark(官网:http://spark.apache.org

 《spark》

 

Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。

2.2. 为什么要学Spark

中间结果输出:基于MapReduce的计算引擎通常会将中间结果输出到磁盘上,进行存储和容错。出于任务管道承接的,考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果

Hadoop

Spark

  

Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

2.3. Spark特点

2.3.1.   快

与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。

 《spark》

 

 

 

2.3.2.   易用

Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

 《spark》

 

2.3.3.   通用

Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

2.3.4.   兼容性

Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。

 《spark》

 

3.  Spark集群安装

3.1. 安装

3.1.1.   机器部署

准备两台以上Linux服务器,安装好JDK1.7

3.1.2.   下载Spark安装包

 

 《spark》

 

http://www.apache.org/dyn/closer.lua/spark/spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz

上传解压安装包

上传spark-1.5.2-bin-hadoop2.6.tgz安装包到Linux上

解压安装包到指定位置

tar -zxvf spark-1.5.2-bin-hadoop2.6.tgz -C /usr/local

3.1.3.   配置Spark

进入到Spark安装目录

cd /usr/local/spark-1.5.2-bin-hadoop2.6

进入conf目录并重命名并修改spark-env.sh.template文件

cd conf/

mv spark-env.sh.template spark-env.sh

vi spark-env.sh

在该配置文件中添加如下配置

export JAVA_HOME=/usr/java/jdk1.7.0_45

export SPARK_MASTER_IP=node1.itcast.cn

export SPARK_MASTER_PORT=7077

保存退出

重命名并修改slaves.template文件

mv slaves.template slaves

vi slaves

在该文件中添加子节点所在的位置(Worker节点)

node2.itcast.cn

node3.itcast.cn

node4.itcast.cn

保存退出

将配置好的Spark拷贝到其他节点上

scp -r spark-1.5.2-bin-hadoop2.6/ node2.itcast.cn:/usr/local/

scp -r spark-1.5.2-bin-hadoop2.6/ node3.itcast.cn:/usr/local/

scp -r spark-1.5.2-bin-hadoop2.6/ node4.itcast.cn:/usr/local/

 

Spark集群配置完毕,目前是1个Master,3个Work,在node1.itcast.cn上启动Spark集群

/usr/local/spark-1.5.2-bin-hadoop2.6/sbin/start-all.sh

 

启动后执行jps命令,主节点上有Master进程,其他子节点上有Work进行,登录Spark管理界面查看集群状态(主节点):http://node1.itcast.cn:8080/

《spark》

 

 

到此为止,Spark集群安装完毕,但是有一个很大的问题,那就是Master节点存在单点故障,要解决此问题,就要借助zookeeper,并且启动至少两个Master节点来实现高可靠,配置方式比较简单:

Spark集群规划:node1,node2是Master;node3,node4,node5是Worker

安装配置zk集群,并启动zk集群

停止spark所有服务,修改配置文件spark-env.sh,在该配置文件中删掉SPARK_MASTER_IP并添加如下配置

export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk1,zk2,zk3 -Dspark.deploy.zookeeper.dir=/spark”

1.在node1节点上修改slaves配置文件内容指定worker节点

2.在node1上执行sbin/start-all.sh脚本,然后在node2上执行sbin/start-master.sh启动第二个Master

4.  执行Spark程序

4.1. 执行第一个spark程序

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

–class org.apache.spark.examples.SparkPi \

–master spark://node1.itcast.cn:7077 \

–executor-memory 1G \

–total-executor-cores 2 \

/usr/local/spark-1.5.2-bin-hadoop2.6/lib/spark-examples-1.5.2-hadoop2.6.0.jar \

100

该算法是利用蒙特·卡罗算法求PI

4.2. 启动Spark Shell

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

4.2.1.   启动spark shell

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell \

–master spark://node1.itcast.cn:7077 \

–executor-memory 2g \

–total-executor-cores 2

 

参数说明:

–master spark://node1.itcast.cn:7077 指定Master的地址

–executor-memory 2g 指定每个worker可用内存为2G

–total-executor-cores 2 指定整个集群使用的cup核数为2个

 

注意:

如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。

 

Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可

4.2.2.   在spark shell中编写WordCount程序

1.首先启动hdfs

2.向hdfs上传一个文件到hdfs://node1.itcast.cn:9000/words.txt

3.在spark shell中用scala语言编写spark程序

sc.textFile(“hdfs://node1.itcast.cn:9000/words.txt”).flatMap(_.split(” “))

.map((_,1)).reduceByKey(_+_).saveAsTextFile(“hdfs://node1.itcast.cn:9000/out”)

 

4.使用hdfs命令查看结果

hdfs dfs -ls hdfs://node1.itcast.cn:9000/out/p*

 

说明:

sc是SparkContext对象,该对象时提交spark程序的入口

textFile(hdfs://node1.itcast.cn:9000/words.txt)是hdfs中读取数据

flatMap(_.split(” “))先map在压平

map((_,1))将单词和1构成元组

reduceByKey(_+_)按照key进行reduce,并将value累加

saveAsTextFile(“hdfs://node1.itcast.cn:9000/out”)将结果写入到hdfs中

4.3. 在IDEA中编写WordCount程序

spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

 

1.创建一个项目

 

 

 

2.选择Maven项目,然后点击next

 

 

3.填写maven的GAV,然后点击next

 

 

4.填写项目名称,然后点击finish

 

 

5.创建好maven项目后,点击Enable Auto-Import

 

 

6.配置Maven的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.spark</groupId>
    <artifactId>spark-mvn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.10.6</scala.version>
        <scala.compat.version>2.10</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.2</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-make:transitive</arg>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.itcast.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

7.将src/main/java和src/test/java分别修改成src/main/scala和src/test/scala,与pom.xml中的配置保持一致

 

 

 

 

8.新建一个scala class,类型为Object

 

 

9.编写spark程序

package cn.itcast.spark

import org.apache.spark.{SparkContext, SparkConf}

object WordCount {
  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称
   
val conf = new SparkConf().setAppName("WC")
    //创建SparkContext,该对象是提交spark App的入口
   
val sc = new SparkContext(conf)
    //使用sc创建RDD并执行相应的transformation和action
   
sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).sortBy(_._2, false).saveAsTextFile(args(1))
    //停止sc,结束该任务
   
sc.stop()
  }
}

 

10.使用Maven打包:首先修改pom.xml中的main class

 

 

点击idea右侧的Maven Project选项

 

 

点击Lifecycle,选择clean和package,然后点击Run Maven Build

 

 

11.选择编译成功的jar包,并将该jar上传到Spark集群中的某个节点上

 

 

12.首先启动hdfs和Spark集群

启动hdfs

/usr/local/hadoop-2.6.1/sbin/start-dfs.sh

启动spark

/usr/local/spark-1.5.2-bin-hadoop2.6/sbin/start-all.sh

 

13.使用spark-submit命令提交Spark应用(注意参数的顺序)

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

–class cn.itcast.spark.WordCount \

–master spark://node1.itcast.cn:7077 \

–executor-memory 2G \

–total-executor-cores 4 \

/root/spark-mvn-1.0-SNAPSHOT.jar \

hdfs://node1.itcast.cn:9000/words.txt \

hdfs://node1.itcast.cn:9000/out

 

查看程序执行结果

hdfs dfs -cat hdfs://node1.itcast.cn:9000/out/part-00000

(hello,6)

(tom,3)

(kitty,2)

(jerry,1)

 

Spark RDD

Spark计算模型

1.  课程目标

1.1. 熟练使用RDD的算子完成计算

1.2. 掌握RDD的原理

2.  弹性分布式数据集RDD

2.1. RDD概述

2.1.1.   什么是RDD

RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

2.1.2.   RDD的属性

 《spark》

 

1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

 

2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

 

3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

 

4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

 

5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

2.2. 创建RDD

1)由一个已经存在的Scala集合创建。

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

 

2)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等

val rdd2 = sc.textFile(“hdfs://node1.itcast.cn:9000/words.txt”)

2.3. RDD编程API

2.3.1.   Transformation

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

 

常用的Transformation:

转换

含义

map(func)

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

filter(func)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

flatMap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子

union(otherDataset)

对源RDD和参数RDD求并集后返回一个新的RDD

intersection(otherDataset)

对源RDD和参数RDD求交集后返回一个新的RDD

distinct([numTasks]))

对源RDD进行去重后返回一个新的RDD

groupByKey([numTasks]) 

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

 

sortByKey([ascending], [numTasks])

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

与sortByKey类似,但是更灵活

join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

cartesian(otherDataset)

笛卡尔积

pipe(command, [envVars])

 

coalesce(numPartitions)  

 

repartition(numPartitions)

 

repartitionAndSortWithinPartitions(partitioner)

 

2.3.2.   Action

动作

含义

reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的

collect()

在驱动程序中,以数组的形式返回数据集的所有元素

count()

返回RDD的元素个数

first()

返回RDD的第一个元素(类似于take(1))

take(n)

返回一个由数据集的前n个元素组成的数组

takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

takeOrdered(n[ordering])

 

saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

saveAsObjectFile(path

 

countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

foreach(func)

在数据集的每一个元素上,运行函数func进行更新。

2.3.3.   WordCount中的RDD

 《spark》

 

2.3.4.   练习

启动spark-shell

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell –master spark://node1.itcast.cn:7077

 

练习1:

//通过并行化生成rdd

val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

//对rdd1里的每一个元素乘2然后排序

val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)

//过滤出大于等于十的元素

val rdd3 = rdd2.filter(_ >= 10)

//将元素以数组的方式在客户端显示

rdd3.collect

 

练习2:

val rdd1 = sc.parallelize(Array(“a b c”, “d e f”, “h i j”))

//将rdd1里面的每一个元素先切分在压平

val rdd2 = rdd1.flatMap(_.split(‘ ‘))

rdd2.collect

 

练习3:

val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求并集

val rdd3 = rdd1.union(rdd2)

//求交集

val rdd4 = rdd1.intersection(rdd2)

//去重

rdd3.distinct.collect

rdd4.collect

 

练习4:

val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2)))

val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))

//求jion

val rdd3 = rdd1.join(rdd2)

rdd3.collect

//求并集

val rdd4 = rdd1 union rdd2

//按key进行分组

rdd4.groupByKey

rdd4.collect

 

练习5:

val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2)))

val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))

//cogroup

val rdd3 = rdd1.cogroup(rdd2)

//注意cogroup与groupByKey的区别

rdd3.collect

 

练习6:

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))

//reduce聚合

val rdd2 = rdd1.reduce(_ + _)

rdd2.collect

 

练习7:

val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2),  (“shuke”, 1)))

val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5)))

val rdd3 = rdd1.union(rdd2)

//按key进行聚合

val rdd4 = rdd3.reduceByKey(_ + _)

rdd4.collect

//按value的降序排序

val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))

rdd5.collect

 

//想要了解更多,访问下面的地址

http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

2.4. RDD的依赖关系

RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

 《spark》

 

2.4.1.   窄依赖

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

总结:窄依赖我们形象的比喻为独生子女

2.4.2.   宽依赖

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

总结:窄依赖我们形象的比喻为超生

2.4.3.   Lineage

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

 

2.5. RDD的缓存

Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

2.5.1.   RDD缓存方式

RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

 《spark》

 

通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

 

《spark》

 

 

缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

 

2.6. DAG的生成

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

 

 《spark》

 

Spark  SQL

Spark SQL and DataFrame

1.  课程目标

1.1. 掌握Spark SQL的原理

1.2. 掌握DataFrame数据结构和使用方式

1.3. 熟练使用Spark SQL完成计算任务

2.  Spark SQL

2.1. Spark SQL概述

2.1.1.   什么是Spark SQL

 《spark》

 

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

2.1.2.   为什么要学习Spark SQL

我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

1.易整合

 《spark》

 

2.统一的数据访问方式

 《spark》

 

3.兼容Hive

 《spark》

 

4.标准的数据连接

 《spark》

 

2.2. DataFrames

2.2.1.   什么是DataFrames

与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。

 《spark》

 

2.2.2.   创建DataFrames

 在Spark SQL中SQLContext是创建DataFrames和执行SQL的入口,在spark-1.5.2中已经内置了一个sqlContext

 《spark》

 

1.在本地创建一个文件,有三列,分别是id、name、age,用空格分隔,然后上传到hdfs上

hdfs dfs -put person.txt /

 

2.在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割

val lineRDD = sc.textFile(“hdfs://node1.itcast.cn:9000/person.txt”).map(_.split(” “))

 

3.定义case class(相当于表的schema)

case class Person(id:Int, name:String, age:Int)

 

4.将RDD和case class关联

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

 

5.将RDD转换成DataFrame

val personDF = personRDD.toDF

 

6.对DataFrame进行处理

personDF.show

 《spark》

 

2.3. DataFrame常用操作

2.3.1.   DSL风格语法

//查看DataFrame中的内容

personDF.show

 

//查看DataFrame部分列中的内容

personDF.select(personDF.col(“name”)).show

personDF.select(col(“name”), col(“age”)).show

personDF.select(“name”).show

 

//打印DataFrame的Schema信息

personDF.printSchema

 

//查询所有的name和age,并将age+1

personDF.select(col(“id”), col(“name”), col(“age”) + 1).show

personDF.select(personDF(“id”), personDF(“name”), personDF(“age”) + 1).show

 《spark》

 

 

//过滤age大于等于18的

personDF.filter(col(“age”) >= 18).show

 《spark》

 

 

//按年龄进行分组并统计相同年龄的人数

personDF.groupBy(“age”).count().show()

 《spark》

 

2.3.2.   SQL风格语法

如果想使用SQL风格的语法,需要将DataFrame注册成表

personDF.registerTempTable(“t_person”)

 

//查询年龄最大的前两名

sqlContext.sql(“select * from t_person order by age desc limit 2”).show

 《spark》

 

 

//显示表的Schema信息

sqlContext.sql(“desc t_person”).show

 《spark》

 

3.  以编程方式执行Spark SQL查询

3.1. 编写Spark SQL查询程序

前面我们学习了如何在Spark Shell中使用SQL完成查询,现在我们来实现在自定义的程序中编写Spark SQL查询程序。首先在maven项目的pom.xml中添加Spark SQL的依赖

 

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

 

3.1.1.   通过反射推断Schema

创建一个object为cn.itcast.spark.sql.InferringSchema

package cn.itcast.spark.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

object InferringSchema {
  def main(args: Array[String]) {

    //创建SparkConf()并设置App名称
   
val conf = new SparkConf().setAppName("SQL-1")
    //SQLContext要依赖SparkContext
   
val sc = new SparkContext(conf)
    //创建SQLContext
   
val sqlContext = new SQLContext(sc)

    //从指定的地址创建RDD
   
val lineRDD = sc.textFile(args(0)).map(_.split(" "))

    //创建case class
    //将RDD和case class关联
   
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    //导入隐式转换,如果不到人无法将RDD转换成DataFrame
    //将RDD转换成DataFrame
   
import sqlContext.implicits._
    val personDF = personRDD.toDF
    //注册表
   
personDF.registerTempTable("t_person")
    //传入SQL
   
val df = sqlContext.sql("select * from t_person order by age desc limit 2")
    //将结果以JSON的方式存储到指定位置
   
df.write.json(args(1))
    //停止Spark Context
   
sc.stop()
  }
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)
 

将程序打成jar包,上传到spark集群,提交Spark任务

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

–class cn.itcast.spark.sql.InferringSchema \

–master spark://node1.itcast.cn:7077 \

/root/spark-mvn-1.0-SNAPSHOT.jar \

hdfs://node1.itcast.cn:9000/person.txt \

hdfs://node1.itcast.cn:9000/out

 

查看运行结果

hdfs dfs -cat  hdfs://node1.itcast.cn:9000/out/part-r-*

 

 《spark》

 

3.1.2.   通过StructType直接指定Schema

创建一个object为cn.itcast.spark.sql.SpecifyingSchema

package cn.itcast.spark.sql

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by ZX on 2015/12/11.
  */
object SpecifyingSchema {
  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称
   
val conf = new SparkConf().setAppName("SQL-2")
    //SQLContext要依赖SparkContext
   
val sc = new SparkContext(conf)
    //创建SQLContext
   
val sqlContext = new SQLContext(sc)
    //从指定的地址创建RDD
   
val personRDD = sc.textFile(args(0)).map(_.split(" "))
    //通过StructType直接指定每个字段的schema
   
val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //将RDD映射到rowRDD
   
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //将schema信息应用到rowRDD上
   
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //注册表
   
personDataFrame.registerTempTable("t_person")
    //执行SQL
   
val df = sqlContext.sql("select * from t_person order by age desc limit 4")
    //将结果以JSON的方式存储到指定位置
   
df.write.json(args(1))
    //停止Spark Context
   
sc.stop()
  }
}
 

将程序打成jar包,上传到spark集群,提交Spark任务

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

–class cn.itcast.spark.sql.InferringSchema \

–master spark://node1.itcast.cn:7077 \

/root/spark-mvn-1.0-SNAPSHOT.jar \

hdfs://node1.itcast.cn:9000/person.txt \

hdfs://node1.itcast.cn:9000/out1

 

查看结果

hdfs dfs -cat  hdfs://node1.itcast.cn:9000/out1/part-r-*

 《spark》

 

4.  数据源

4.1. JDBC

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

4.1.1.   从MySQL中加载数据(Spark Shell方式)

1.启动Spark Shell,必须指定mysql连接驱动jar包

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell \

–master spark://node1.itcast.cn:7077 \

–jars /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

–driver-class-path /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar

 

2.从mysql中加载数据

val jdbcDF = sqlContext.read.format(“jdbc”).options(Map(“url” -> “jdbc:mysql://192.168.10.1:3306/bigdata”, “driver” -> “com.mysql.jdbc.Driver”, “dbtable” -> “person”, “user” -> “root”, “password” -> “123456”)).load()

 

3.执行查询

jdbcDF.show()

 《spark》

 

4.1.2.   将数据写入到MySQL中(打jar包方式)

1.编写Spark SQL程序

package cn.itcast.spark.sql

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object JdbcRDD {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("MySQL-Demo")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //通过并行化创建RDD
   
val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
    //通过StructType直接指定每个字段的schema
   
val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //将RDD映射到rowRDD
   
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //将schema信息应用到rowRDD上
   
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //创建Properties存储数据库相关属性
   
val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "123456")
    //将数据追加到数据库
   
personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.10.1:3306/bigdata", "bigdata.person", prop)
    //停止SparkContext
   
sc.stop()
  }
}
 

 

2.用maven将程序打包

 

3.将Jar包提交到spark集群

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \

–class cn.itcast.spark.sql.JdbcRDD \

–master spark://node1.itcast.cn:7077 \

–jars /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

–driver-class-path /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

/root/spark-mvn-1.0-SNAPSHOT.jar

 

Spark  Streaming

Spark Streaming

1.  课程目标

1.1. 掌握Spark Streaming的原理

1.2. 熟练使用Spark Streaming完成流式计算任务

2.  Spark Streaming介绍

2.1. Spark Streaming概述

2.1.1.   什么是Spark Streaming

 《spark》

 

Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。

 《spark》

 

2.1.2.   为什么要学习Spark Streaming

 

1.易用

 《spark》

 

2.容错

 《spark》

 

3.易整合到Spark体系

 《spark》

 

2.1.3.   Spark与Storm的对比

Spark

Storm

 《spark》

 

 《spark》

 

开发语言:Scala

开发语言:Clojure

编程模型:DStream

编程模型:Spout/Bolt

 《spark》

 

 《spark》

 

 

3.  DStream

3.1. 什么是DStream

Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:

 《spark》

 

对数据的操作也是按照RDD为单位来进行的

 《spark》

 

计算过程由Spark engine来完成

 《spark》

 

3.2. DStream相关操作

DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

 

3.2.1.   Transformations on DStreams

Transformation

Meaning

map(func)

Return a new DStream by passing each element of the source DStream through a function func.

flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items.

filter(func)

Return a new DStream by selecting only the records of the source DStream on which func returns true.

repartition(numPartitions)

Changes the level of parallelism in this DStream by creating more or fewer partitions.

union(otherStream)

Return a new DStream that contains the union of the elements in the source DStream and otherDStream.

count()

Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.

reduce(func)

Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel.

countByValue()

When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.

reduceByKey(func, [numTasks])   

When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.

join(otherStream, [numTasks])

When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.

cogroup(otherStream, [numTasks])

When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.

transform(func)     

Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.

updateStateByKey(func)

Return a new “state” DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

 

特殊的Transformations

 

1.UpdateStateByKey Operation

UpdateStateByKey原语用于记录历史记录,上文中Word Count示例中就用到了该特性。若不用UpdateStateByKey来更新状态,那么每次数据进来后分析完成后,结果输出后将不在保存

 

2.Transform Operation

Transform原语允许DStream上执行任意的RDD-to-RDD函数。通过该函数可以方便的扩展Spark API。此外,MLlib(机器学习)以及Graphx也是通过本函数来进行结合的。

 

3.Window Operations

Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态

 《spark》

 

3.2.2.   Output Operations on DStreams

Output Operations可以将DStream的数据输出到外部的数据库或文件系统,当某个Output Operations原语被调用时(与RDD的Action相同),streaming程序才会开始真正的计算过程。

Output Operation

Meaning

print()

Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.

saveAsTextFiles(prefix, [suffix])

Save this DStream’s contents as text files. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”.

saveAsObjectFiles(prefix, [suffix])

Save this DStream’s contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”.

saveAsHadoopFiles(prefix, [suffix])

Save this DStream’s contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”.

foreachRDD(func)

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

4.  实战

4.1. 用Spark Streaming实现实时WordCount

架构图:

 《spark》

 

1.安装并启动生成者

首先在一台Linux(ip:192.168.10.101)上用YUM安装nc工具

yum install -y nc

 

启动一个服务端并监听9999端口

nc -lk 9999

 

2.编写Spark Streaming程序

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]) {
    //设置日志级别
   
LoggerLevel.setStreamingLogLevels()
    //创建SparkConf并设置为本地模式运行
    //注意local[2]代表开两个线程
   
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //设置DStream批次时间间隔为2秒
   
val ssc = new StreamingContext(conf, Seconds(2))
    //通过网络读取数据
   
val lines = ssc.socketTextStream("192.168.10.101", 9999)
    //将读到的数据用空格切成单词
   
val words = lines.flatMap(_.split(" "))
    //将单词和1组成一个pair
   
val pairs = words.map(word => (word, 1))
    //按单词进行分组求相同单词出现的次数
   
val wordCounts = pairs.reduceByKey(_ + _)
    //打印结果到控制台
   
wordCounts.print()
    //开始计算
   
ssc.start()
    //等待停止
   
ssc.awaitTermination()
  }
}

 

3.启动Spark Streaming程序:由于使用的是本地模式“local[2]”所以可以直接在本地运行该程序

注意:要指定并行度,如在本地运行设置setMaster(“local[2]”),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1

 《spark》

 

 

4.在Linux端命令行中输入单词

 《spark》

 

5.在IDEA控制台中查看结果

 《spark》

 

问题:结果每次在Linux段输入的单词次数都被正确的统计出来,但是结果不能累加!如果需要累加需要使用updateStateByKey(func)来更新状态,下面给出一个例子:

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Seconds}

object NetworkUpdateStateWordCount {
  /**
    * String :
单词 hello
    * Seq[Int] :单词在当前批次出现的次数
    * Option[Int] : 历史结果
    */
 
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
   
iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
  }

  def main(args: Array[String]) {
    LoggerLevel.setStreamingLogLevels()
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    //做checkpoint 写入共享存储中
   
ssc.checkpoint("c://aaa")
    val lines = ssc.socketTextStream("192.168.10.100", 9999)
    //reduceByKey 结果不累加
    //val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    //updateStateByKey结果可以累加但是需要传入一个自定义的累加函数:updateFunc
   
val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
 

4.2. Spark Streaming整合Kafka完成网站点击流实时统计

 《spark》

 

1.安装并配置zk

2.安装并配置Kafka

3.启动zk

4.启动Kafka

5.创建topic

bin/kafka-topics.sh –create –zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181 \

–replication-factor 3 –partitions 3 –topic urlcount

6.编写Spark Streaming应用程序

package cn.itcast.spark.streaming

package cn.itcast.spark

import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UrlCount {
  val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
    iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
  }

  def main(args: Array[String]) {
    //接收命令行中的参数
   
val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
    //创建SparkConf并设置AppName
   
val conf = new SparkConf().setAppName("UrlCount")
    //创建StreamingContext
   
val ssc = new StreamingContext(conf, Seconds(2))
    //设置检查点
   
ssc.checkpoint(hdfs)
    //设置topic信息
   
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //重Kafka中拉取数据创建DStream
   
val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
    //切分数据,截取用户点击的url
   
val urls = lines.map(x=>(x.split(" ")(6), 1))
    //统计URL点击量
   
val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    //将结果打印到控制台
   
result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
    原文作者:spark
    原文地址: https://www.cnblogs.com/skorzeny/p/6686834.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞