spark

SPARK核心编程
一、spark基本工作原理与RDD
1.Spark的基本工作原理
1.分布式(RDD的partition)
2.主要是基于内存(少数情况下数基于磁盘)
3.迭代式计算(RDD->RDD->RDD)

客户端(client):我们在本地编写了spark程序,然后必须在某台能够连接spark的机器上提交spark程序

Spark程序被提交到spark集群上进行运算

spark数据的来源可能是hadoop的hdfs或者hive上,将读取到的数据分散到集群的不同的节点的内存中,对节点上的数据进行操作处理,处理后的数据可能会移动到其他节点的内存中,可能是继续由其他的节点进行操作,所有的计算操作都是针对多个节点上的数据,进行并行计算操作的(因为一个RDD是分区的,每个计算都是在不同节点上并行计算)。

Spark处理完的结果存储的位置可能是hadoop的hdfs、hive或者是MySQL等DB亦或是直接返回到客户端(运行spark程序的机器进程)
 
Spark与mapreduce最大的不同就在于,迭代式计算模型:
Mapreduce ,分为两个阶段,map和reduce,两个节点完了,就结束了,所以我们在一个job中能做得处理很有限,只能在map和reduce里处理
Spark,计算模型可以分为n个阶段,因为他是内存迭代式的,我们在处理完一个阶段以后,可以继续往下处理很多的阶段,而不只是两个阶段,所以,spark相较于mapreduce来说,计算模型可以提供更强大的功能
2.RDD以及其特点
弹性:内存资源不足的情况下,自动将RDD存储到磁盘上,以及切换机制
分布式:被分区的,分布到不同的节点上进行并行的计算
数据集:数据的集合
1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。
2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)
3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合的并行化来创建。
4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。
5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)

一个RDD在逻辑上,抽象的代表一个HDFS文件。但是他实际上是被分区得。分为多个分区,多个分区散落在spark集群中的不同的节点上。比如RDD有90万数据,分为9个partition,9个分区

现在,节点9出了故障,导致parttion 9的数据丢失,RDD具有很强的容错性,当他发现自己的数据丢失了,那么此时会自动从自己来源的数据进行重计算,重新获取自己这份数据,这一切对用户,都是完全透明的。

RDD的每个partition,在spark节点上存储时,默认都是放在内存中的。但是如果说内存放不下这么多数据时,比如每个节点最多放5万数据,结果你每个partition是10万数据。那么就会把partition中的部分数据写入磁盘上,进行保存。

而上述这一切,对于用户来说,都是完全透明的。也就是说,你不用去管RDD的数据存储在哪里,内存,还是磁盘。只要关注,你针对RDD来进行计算,和处理,等等操作即可。

所以说,RDD的这种自动进行内存和磁盘之间权衡和切换的机制,就是RDD的弹性的特点所在。
3.spark核心编程
Spark核心编程是什么?

第一:
定义初始的RDD,就是说你要定义第一个RDD是从哪里,读取数据,hdfs、linux本地文件、程序中的集合。

第二:
定义对RDD的计算操作,这个在spark中称之为算子,map 、reduce、flatMap、groupByKey等,比mapreduce强大太多

第三:
就是循环往复的过程,计算完成之后,数据就可能到了一批新的节点上,也就是变成了一个新的RDD,然后在再次反复,针对新的RDD定义计算操作

第四;
获取最终的数据,将数据保存起来

每一个节点上的每一批数据,实际上就是一个RDD,一个RDD是分布式的,所以数据都散落在一批节点上了,每个节点都存储了RDD的部分partition

4.spark开发
1、核心开发:离线批处理 / 延迟性的交互式数据处理
2、SQL查询:底层都是RDD和计算操作
3、实时计算:底层都是RDD和计算操作

二、使用java、Scala、spark-shell开发wordCount
1、用Java开发wordcount程序
  1.1 配置maven环境
  1.2 如何进行本地测试
【注意】
本地测试的作用,先伪造一些数据,测试代码的正确性之后,提交到集群中,执行
防止在集群中出错,同时减少解决问题的时间,因为数据量比实际的要少的多
  1.3 如何使用spark-submit提交到spark集群进行执行(spark-submit常用参数说明,spark-submit其实就类似于hadoop的hadoop jar命令)
Pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.spark</groupId>
  <artifactId>SparkTest</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>SparkTest</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.10</artifactId>
      <version>1.3.0</version>
      </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.10</artifactId>
      <version>1.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.10</artifactId>
      <version>1.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.10</artifactId>
      <version>1.3.0</version>
    </dependency>
  </dependencies>
  
  <build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>src/main/test</testSourceDirectory>

    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass></mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.2.1</version>
        <executions>
          <execution>
            <goals>
              <goal>exec</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <executable>java</executable>
          <includeProjectDependencies>true</includeProjectDependencies>
          <includePluginDependencies>false</includePluginDependencies>
          <classpathScope>compile</classpathScope>
          <mainClass>cn.spark.sparktest.App</mainClass>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>

    </plugins>
  </build>
</project>

Wordcountlocal
package cn.spark.study.core;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
 * 使用java开发本地测试的wordcount程序
 * @author Administrator
 *
 */
public class WordCountLocal {
    
    public static void main(String[] args) {
        // 编写Spark应用程序
        // 本地执行,是可以执行在eclipse中的main方法中,执行的
        
        // 第一步:创建SparkConf对象,设置Spark应用的配置信息
        // 使用setMaster()可以设置Spark应用程序要连接的Spark集群的master节点的url
        // 但是如果设置为local则代表,在本地运行
        SparkConf conf = new SparkConf()
                .setAppName("WordCountLocal")
                .setMaster("local");  
        
        // 第二步:创建JavaSparkContext对象
        // 在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java、scala,甚至是python编写
            // 都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括
            // 调度器(DAGSchedule、TaskScheduler),还会去到Spark Master节点上进行注册,等等
        // 一句话,SparkContext,是Spark应用中,可以说是最最重要的一个对象
        // 但是呢,在Spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的,如果使用scala,
            // 使用的就是原生的SparkContext对象
            // 但是如果使用Java,那么就是JavaSparkContext对象
            // 如果是开发Spark SQL程序,那么就是SQLContext、HiveContext
            // 如果是开发Spark Streaming程序,那么就是它独有的SparkContext
            // 以此类推
        JavaSparkContext sc = new JavaSparkContext(conf);
    
        // 第三步:要针对输入源(hdfs文件、本地文件,等等),创建一个初始的RDD
        // 输入源中的数据会打散,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集
        // 我们这里呢,因为是本地测试,所以呢,就是针对本地文件
        // SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile()方法
        // 在Java中,创建的普通RDD,都叫做JavaRDD
        // 在这里呢,RDD中,有元素这种概念,如果是hdfs或者本地文件呢,创建的RDD,每一个元素就相当于
        // 是文件里的一行
        JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");
    
        // 第四步:对初始RDD进行transformation操作,也就是一些计算操作
        // 通常操作会通过创建function,并配合RDD的map、flatMap等算子来执行
        // function,通常,如果比较简单,则创建指定Function的匿名内部类
        // 但是如果function比较复杂,则会单独创建一个类,作为实现这个function接口的类    
        // 先将每一行拆分成单个的单词
        // FlatMapFunction,有两个泛型参数,分别代表了输入和输出类型
        // 我们这里呢,输入肯定是String,因为是一行一行的文本,输出,其实也是String,因为是每一行的文本
        // 这里先简要介绍flatMap算子的作用,其实就是,将RDD的一个元素,给拆分成一个或多个元素
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {    
            private static final long serialVersionUID = 1L;        
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));  
            }            
        });        
        // 接着,需要将每一个单词,映射为(单词, 1)的这种格式
            // 因为只有这样,后面才能根据单词作为key,来进行每个单词的出现次数的累加
        // mapToPair,其实就是将每个元素,映射为一个(v1,v2)这样的Tuple2类型的元素
            // 如果大家还记得scala里面讲的tuple,那么没错,这里的tuple2就是scala类型,包含了两个值
        // mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型
            // 第二个和第三个泛型参数,代表的输出的Tuple2的第一个值和第二个值的类型
        // JavaPairRDD的两个泛型参数,分别代表了tuple元素的第一个值和第二个值的类型
        JavaPairRDD<String, Integer> pairs = words.mapToPair(            
                new PairFunction<String, String, Integer>() {
                    private static final long serialVersionUID = 1L;    
                    @Override
                    public Tuple2<String, Integer> call(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }                    
                });    
        // 接着,需要以单词作为key,统计每个单词出现的次数
        // 这里要使用reduceByKey这个算子,对每个key对应的value,都进行reduce操作
        // 比如JavaPairRDD中有几个元素,分别为(hello, 1) (hello, 1) (hello, 1) (world, 1)
        // reduce操作,相当于是把第一个值和第二个值进行计算,然后再将结果与第三个值进行计算
        // 比如这里的hello,那么就相当于是,首先是1 + 1 = 2,然后再将2 + 1 = 3
        // 最后返回的JavaPairRDD中的元素,也是tuple,但是第一个值就是每个key,第二个值就是key的value
        // reduce之后的结果,相当于就是每个单词出现的次数
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(            
                new Function2<Integer, Integer, Integer>() {                
                    private static final long serialVersionUID = 1L;    
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }                    
                });    
        // 到这里为止,我们通过几个Spark算子操作,已经统计出了单词的次数
        // 但是,之前我们使用的flatMap、mapToPair、reduceByKey这种操作,都叫做transformation操作
        // 一个Spark应用中,光是有transformation操作,是不行的,是不会执行的,必须要有一种叫做action
        // 接着,最后,可以使用一种叫做action操作的,比如说,foreach,来触发程序的执行
        wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {        
            private static final long serialVersionUID = 1L;    
            @Override
            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");    
            }        
        });    
        sc.close();
    }    
}
集群模式
package cn.spark.study.core;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
 * 将java开发的wordcount程序部署到spark集群上运行
 * @author Administrator
 */
public class WordCountCluster {
    public static void main(String[] args) {
        // 如果要在spark集群上运行,需要修改的,只有两个地方
        // 第一,将SparkConf的setMaster()方法给删掉,默认它自己会去连接
        // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件    
        // 实际执行步骤:
        // 1、将spark.txt文件上传到hdfs上去
        // 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包
        // 3、将打包后的spark工程jar包,上传到机器上执行
        // 4、编写spark-submit脚本
        // 5、执行spark-submit脚本,提交spark应用到集群执行        
        SparkConf conf = new SparkConf()
                .setAppName("WordCountCluster");      
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/spark.txt");    
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;        
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));  
            }        
        });
        JavaPairRDD<String, Integer> pairs = words.mapToPair(                
                new PairFunction<String, String, Integer>() {
                    private static final long serialVersionUID = 1L;        
                    @Override
                    public Tuple2<String, Integer> call(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }                
                });    
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(            
                new Function2<Integer, Integer, Integer>() {                
                    private static final long serialVersionUID = 1L;    
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }                    
                });
        wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {            
            private static final long serialVersionUID = 1L;            
            @Override
            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");    
            }            
        });        
        sc.close();
    }    
}

2、用Scala开发wordcount程序
  2.1 下载scala ide for eclipse
  2.2 在Java Build Path中,添加spark依赖包(如果与scala ide for eclipse原生的scala版本发生冲突,则移除原生的scala / 重新配置scala compiler)
  2.3 用export导出scala spark工程
Scala_wordcount
package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
        .setAppName("WordCount");
    val sc = new SparkContext(conf)
  
    val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1); 
    val words = lines.flatMap { line => line.split(" ") }   
    val pairs = words.map { word => (word, 1) }   
    val wordCounts = pairs.reduceByKey { _ + _ }
    
wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))  
Sc.stop()
  } 
}

3、用spark-shell开发wordcount程序
  3.1 常用于简单的测试

--master 不写表示的是local 本地模式
--master spark://hadoop01:7077 表示的是spark自生的集群模式 standalone
--master yarn-client
--master yarn-cluster 表示spark应用运行在yarn上 


三、spark的架构原理
1.原理图

2.详解
组件:master worker executor task driver 
1.我们在集群中的一个节点上或者自己提交spark程序的机器上提交一个spark程序后,会启动一个driver进程。
2.Driver 进程启动后会做一些初始化的操作,在这个过程中,就会发送发送请求到master上,进行spark应用程序的注册,说白了就是让master知道,有一个新的spark应用程序要运行。
3.Master是一个进行,主要是负责资源的调度和分配,还有集群的监控等职责。
4.Worker是一个进行,主要是负责两个,一个是用自己的内存存储RDD的某个或者是某些partition,另外一个是启动其他的进程和线程,对RDD上的partition进行处理和计算
5.Worker接收到master的请求之后,会为spark应用起动executor
6.Executor和task其实就是负责执行,对RDD的partition进行并行的计算,也就是执行我们对RDD定义的,比如map flatMap reduce 等算子操作
7. Executor启动之后,会向driver进行反注册,这样driver就知道哪些executor是为他进行服务的了。
8.Driver注册了一些executor之后,就可以开始正式的执行我们的spark应用程序了。首先第一步就是创建RDD,读取数据源的文件
9.HDFS上的文件内容被读取到多个work节点上,形成内存中的分布式的数据集,也就是初始RDD
10.Driver会根据我们对RDD定义的操作,提交一大堆的task去executor上
11.Executor接收到task之后,会启动多个线程来执行task.
12.Task会对RDD的partition数据执行指定的算子操作,形成新的RDD的partition

四、创建RDD
创建的方式
     进行Spark核心编程时,首先要做的第一件事,就是创建一个初始的RDD。该RDD中,通常就代表和包含了Spark应用程序的输入源数据。然后在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。
Spark core提供三种创建RDD的三种方式:
1.使用程序中的集合创建RDD,并行化集合
如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。

调用parallelize()时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2~4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如parallelize(arr, 10)
// 案例:1到10累加求和
val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)


2.使用本地文件创建RDD
Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。

有几个事项是需要注意的:
1、如果是针对本地文件的话,如果是在windows上本地测试,windows上有一份文件即可;如果是在spark集群上针对linux本地文件,那么需要将文件拷贝到所有worker节点上。
2、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建。
3、Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少。

// 案例:文件字数统计
val rdd = sc.textFile("data.txt")
val wordCount = rdd.map(line => line.length).reduce(_ + _)

3.使用HDFS文件创建RDD

Spark的textFile()除了可以针对上述几种普通的文件创建RDD之外,还有一些特列的方法来创建RDD:

1、SparkContext.wholeTextFiles()方法,可以针对一个目录中的大量小文件,返回<filename, fileContent>组成的pair,作为一个PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每个元素就是文件中的一行文本。
2、SparkContext.sequenceFile[K, V]()方法,可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化类型,比如IntWritable、Text等。
3、SparkContext.hadoopRDD()方法,对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。
4、SparkContext.objectFile()方法,可以针对之前调用RDD.saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。

注意;
1.使用程序中的集合创建RDD,只要用于测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用流程
2.使用本地文件创建RDD,主要是用于临时的处理一些存储大量的数据的文件
3.使用HDFS文件创建RDD,应该是最常用的生产环境的处理方式,只要可以针对HDFS上存储的大数据,进行离线的批处理操作
Spark core主要是用来代替mapreduce的批处理操作
五、RDD的操作
RDD的两种操作
Spark支持两种RDD操作:transformation和action。transformation操作会针对已有的RDD创建一个新的RDD;而action则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并可以返回结果给Driver程序。

例如,map就是一种transformation操作,它用于将已有RDD的每个元素传入一个自定义的函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。而reduce就是一种action操作,它用于对RDD中的所有元素进行聚合操作,并获取一个最终的结果,然后返回给Driver程序。

transformation的特点就是lazy特性。lazy特性指的是,如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。

action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。这是action的特性。

案例一:统计文件的字数
这里通过一个之前学习过的案例,统计文件字数,来讲解transformation和action。

// 这里通过textFile()方法,针对外部文件创建了一个RDD,lines,但是实际上,程序执行到这里为止,spark.txt文件的数据是不会加载到内存中的。lines,只是代表了一个指向spark.txt文件的引用。
val lines = sc.textFile("spark.txt")

// 这里对lines RDD进行了map算子,获取了一个转换后的lineLengths RDD。但是这里连数据都没有,当然也不会做任何操作。lineLengths RDD也只是一个概念上的东西而已。
val lineLengths = lines.map(line => line.length)

// 之列,执行了一个action操作,reduce。此时就会触发之前所有transformation操作的执行,Spark会将操作拆分成多个task到多个机器上并行执行,每个task会在本地执行map操作,并且进行本地的reduce聚合。最后会进行一个全局的reduce聚合,然后将结果返回给Driver程序。
val totalLength = lineLengths.reduce(_ + _)

案例二:统计文件每行出现的次数
Spark有些特殊的算子,也就是特殊的transformation操作。比如groupByKey、sortByKey、reduceByKey等,其实只是针对特殊的RDD的。即包含key-value对的RDD。而这种RDD中的元素,实际上是scala中的一种类型,即Tuple2,也就是包含两个值的Tuple。

在scala中,需要手动导入Spark的相关隐式转换,import org.apache.spark.SparkContext._。然后,对应包含Tuple2的RDD,会自动隐式转换为PairRDDFunction,并提供reduceByKey等方法。

val lines = sc.textFile("hello.txt")
val linePairs = lines.map(line => (line, 1))
val lineCounts = linePairs.reduceByKey(_ + _)
lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + llineCount._2 + " times."))

2.常见的Transformation操作
操作    介绍
map    将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD
filter    对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。
flatMap    与map类似,但是对每个元素都可以返回一个或多个新元素。
gropuByKey    根据key进行分组,每个key对应一个Iterable<value>
reduceByKey    对每个key对应的value进行reduce操作。
sortByKey    对每个key对应的value进行排序操作。
join    对两个包含<key,value>对的RDD进行join操作,每个key join上的pair,都会传入自定义函数进行处理。
cogroup    同join,但是是每个key对应的Iterable<value>都会传入自定义函数进行处理。

1.Map
Java:
/**
     * map算子案例:将集合中每一个元素都乘以2
     */
    private static void map() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("map")
                .setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 构造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        // 并行化集合,创建初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        
        // 使用map算子,将集合中的每个元素都乘以2
        // map算子,是对任何类型的RDD,都可以调用的
        // 在java中,map算子接收的参数是Function对象
        // 创建的Function对象,一定会让你设置第二个泛型参数,这个泛型类型,就是返回的新元素的类型
            // 同时call()方法的返回类型,也必须与第二个泛型类型同步
        // 在call()方法内部,就可以对原始RDD中的每一个元素进行各种处理和计算,并返回一个新的元素
        // 所有新的元素就会组成一个新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(
                
                new Function<Integer, Integer>() {

                    private static final long serialVersionUID = 1L;
                    
                    // 传入call()方法的,就是1,2,3,4,5
                    // 返回的就是2,4,6,8,10
                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }
                });
        // 打印新的RDD
        multipleNumberRDD.foreach(new VoidFunction<Integer>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                System.out.println(t);  
            }
        });
        // 关闭JavaSparkContext
        sc.close();
    }
Scala
 def map() {
    val conf = new SparkConf()
        .setAppName("map")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val numbers = Array(1, 2, 3, 4, 5)
    val numberRDD = sc.parallelize(numbers, 1)  
    val multipleNumberRDD = numberRDD.map { num => num * 2 }  
    
    multipleNumberRDD.foreach { num => println(num) }   
  }

2.Filter
Java 
/**
     * filter算子案例:过滤集合中的偶数
     */
    private static void filter() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("filter")
                .setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);    
        // 模拟集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 并行化集合,创建初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        
        // 对初始RDD执行filter算子,过滤出其中的偶数
        // filter算子,传入的也是Function,其他的使用注意点,实际上和map是一样的
        // 但是,唯一的不同,就是call()方法的返回类型是Boolean
        // 每一个初始RDD中的元素,都会传入call()方法,此时你可以执行各种自定义的计算逻辑
        // 来判断这个元素是否是你想要的
        // 如果你想在新的RDD中保留这个元素,那么就返回true;否则,不想保留这个元素,返回false
        JavaRDD<Integer> evenNumberRDD = numberRDD.filter(                
                new Function<Integer, Boolean>() {
                    private static final long serialVersionUID = 1L;
                    
                    // 在这里,1到10,都会传入进来
                    // 但是根据我们的逻辑,只有2,4,6,8,10这几个偶数,会返回true
                    // 所以,只有偶数会保留下来,放在新的RDD中
                    @Override
                    public Boolean call(Integer v1) throws Exception {
                        return v1 % 2 == 0;
                    }    
                });
        // 打印新的RDD
        evenNumberRDD.foreach(new VoidFunction<Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Integer t) throws Exception {
                System.out.println(t);
            }        
        });    
        // 关闭JavaSparkContext
        sc.close();
    }
Scala
  def filter() {
    val conf = new SparkConf()
        .setAppName("filter")
        .setMaster("local")
    val sc = new SparkContext(conf)
    
    val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numberRDD = sc.parallelize(numbers, 1)
    val evenNumberRDD = numberRDD.filter { num => num % 2 == 0 }
    
    evenNumberRDD.foreach { num => println(num) }   
  }

3.flatMap
java
/**
     * flatMap案例:将文本行拆分为多个单词
     */
    private static void flatMap() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("flatMap")  
                .setMaster("local");  
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 构造集合
        List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");  
        
        // 并行化集合,创建RDD
        JavaRDD<String> lines = sc.parallelize(lineList);
        
        // 对RDD执行flatMap算子,将每一行文本,拆分为多个单词
        // flatMap算子,在java中,接收的参数是FlatMapFunction
        // 我们需要自己定义FlatMapFunction的第二个泛型类型,即,代表了返回的新元素的类型
        // call()方法,返回的类型,不是U,而是Iterable<U>,这里的U也与第二个泛型类型相同
        // flatMap其实就是,接收原始RDD中的每个元素,并进行各种逻辑的计算和处理,返回可以返回多个元素
        // 多个元素,即封装在Iterable集合中,可以使用ArrayList等集合
        // 新的RDD中,即封装了所有的新元素;也就是说,新的RDD的大小一定是 >= 原始RDD的大小
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;
            
            // 在这里会,比如,传入第一行,hello you
            // 返回的是一个Iterable<String>(hello, you)
            @Override
            public Iterable<String> call(String t) throws Exception {
                return Arrays.asList(t.split(" "));
            }
        });
        // 打印新的RDD
        words.foreach(new VoidFunction<String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(String t) throws Exception {
                System.out.println(t);
            }
        });        
        // 关闭JavaSparkContext
        sc.close();
    }
Scala
 def flatMap() {
    val conf = new SparkConf()
        .setAppName("flatMap")  
        .setMaster("local")  
    val sc = new SparkContext(conf) 
    
    val lineArray = Array("hello you", "hello me", "hello world")  
    val lines = sc.parallelize(lineArray, 1)
    val words = lines.flatMap { line => line.split(" ") }   
      
    words.foreach { word => println(word) }
  }

4.groupByKey
java
/**
     * groupByKey案例:按照班级对成绩进行分组
     */
    private static void groupByKey() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("groupByKey")  
                .setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);        
        // 模拟集合
        List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                new Tuple2<String, Integer>("class1", 80),
                new Tuple2<String, Integer>("class2", 75),
                new Tuple2<String, Integer>("class1", 90),
                new Tuple2<String, Integer>("class2", 65));        
        // 并行化集合,创建JavaPairRDD
        JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);        
        // 针对scores RDD,执行groupByKey算子,对每个班级的成绩进行分组
        // groupByKey算子,返回的还是JavaPairRDD
        // 但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型
        // 也就是说,按照了key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable
        // 那么接下来,我们是不是就可以通过groupedScores这种JavaPairRDD,很方便地处理某个分组内的数据
        JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();        
        // 打印groupedScores RDD
        groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Iterable<Integer>> t)
                    throws Exception {
                System.out.println("class: " + t._1);  
                Iterator<Integer> ite = t._2.iterator();
                while(ite.hasNext()) {
                    System.out.println(ite.next());  
                }
                System.out.println("==============================");   
            }        
        });
        // 关闭JavaSparkContext
        sc.close();
    }

Scala
 def groupByKey() {
    val conf = new SparkConf()
        .setAppName("groupByKey")  
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),
        Tuple2("class1", 90), Tuple2("class2", 60))
    val scores = sc.parallelize(scoreList, 1)  
    val groupedScores = scores.groupByKey() 
    
    groupedScores.foreach(score => { 
      println(score._1); 
      score._2.foreach { singleScore => println(singleScore) };
      println("=============================")  
    })
  }

5.reduceByKey
java
/**
     * reduceByKey案例:统计每个班级的总分
     */
    private static void reduceByKey() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("reduceByKey")  
                .setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);    
        // 模拟集合
        List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                new Tuple2<String, Integer>("class1", 80),
                new Tuple2<String, Integer>("class2", 75),
                new Tuple2<String, Integer>("class1", 90),
                new Tuple2<String, Integer>("class2", 65));        
        // 并行化集合,创建JavaPairRDD
        JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);    
        // 针对scores RDD,执行reduceByKey算子
        // reduceByKey,接收的参数是Function2类型,它有三个泛型参数,实际上代表了三个值
        // 第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型
            // 因此对每个key进行reduce,都会依次将第一个、第二个value传入,将值再与第三个value传入
            // 因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型
        // 第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的
        // reduceByKey算法返回的RDD,还是JavaPairRDD<key, value>
        JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(            
                new Function2<Integer, Integer, Integer>() {                
                    private static final long serialVersionUID = 1L;
                    
                    // 对每个key,都会将其value,依次传入call方法
                    // 从而聚合出每个key对应的一个value
                    // 然后,将每个key对应的一个value,组合成一个Tuple2,作为新RDD的元素
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }                
                });    
        // 打印totalScores RDD
        totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._1 + ": " + t._2);   
            }        
        });        
        // 关闭JavaSparkContext
        sc.close();
    }

Scala
def reduceByKey() {
    val conf = new SparkConf()
        .setAppName("groupByKey")  
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),
        Tuple2("class1", 90), Tuple2("class2", 60))
    val scores = sc.parallelize(scoreList, 1)  
    val totalScores = scores.reduceByKey(_ + _)  
    
    totalScores.foreach(classScore => println(classScore._1 + ": " + classScore._2))  
  }

6.sortByKey
java
/**
     * sortByKey案例:按照学生分数进行排序
     */
    private static void sortByKey() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("sortByKey")  
                .setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 模拟集合
        List<Tuple2<Integer, String>> scoreList = Arrays.asList(
                new Tuple2<Integer, String>(65, "leo"),
                new Tuple2<Integer, String>(50, "tom"),
                new Tuple2<Integer, String>(100, "marry"),
                new Tuple2<Integer, String>(80, "jack"));
        // 并行化集合,创建RDD
        JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
        
        // 对scores RDD执行sortByKey算子
        // sortByKey其实就是根据key进行排序,可以手动指定升序,或者降序
        // 返回的,还是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一样的
        // 但是就是RDD中的元素的顺序,不同了
        JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);      
        // 打印sortedScored RDD
        sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<Integer, String> t) throws Exception {
                System.out.println(t._1 + ": " + t._2);  
            }            
        });        
        // 关闭JavaSparkContext
        sc.close();
    }

Scala
 def sortByKey() {
    val conf = new SparkConf()
        .setAppName("sortByKey")  
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val scoreList = Array(Tuple2(65, "leo"), Tuple2(50, "tom"), 
        Tuple2(100, "marry"), Tuple2(85, "jack"))  
    val scores = sc.parallelize(scoreList, 1)  
    val sortedScores = scores.sortByKey(false)
    
    sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))  
  }

7.Join
java
/**
     * join案例:打印学生成绩
     */
    private static void join() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("join")  
                .setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 模拟集合
        List<Tuple2<Integer, String>> studentList = Arrays.asList(
                new Tuple2<Integer, String>(1, "leo"),
                new Tuple2<Integer, String>(2, "jack"),
                new Tuple2<Integer, String>(3, "tom"));
        
        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 100),
                new Tuple2<Integer, Integer>(2, 90),
                new Tuple2<Integer, Integer>(3, 60));
        
        // 并行化两个RDD
        JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
        JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
        
        // 使用join算子关联两个RDD
        // join以后,还是会根据key进行join,并返回JavaPairRDD
        // 但是JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key的类型,因为是通过key进行join的
        // 第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型
        // join,就返回的RDD的每一个元素,就是通过key join上的一个pair
        // 什么意思呢?比如有(1, 1) (1, 2) (1, 3)的一个RDD
            // 还有一个(1, 4) (2, 1) (2, 2)的一个RDD
            // join以后,实际上会得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
        JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
        
        // 打印studnetScores RDD
        studentScores.foreach(
                
                new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {

                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
                            throws Exception {
                        System.out.println("student id: " + t._1);  
                        System.out.println("student name: " + t._2._1);  
                        System.out.println("student score: " + t._2._2);
                        System.out.println("===============================");   
                    }                
                });    
        // 关闭JavaSparkContext
        sc.close();
    }

Scala
 def join() {
    val conf = new SparkConf()
        .setAppName("join")  
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
   val studentList = Array(
        Tuple2(1, "leo"),
        Tuple2(2, "jack"),
        Tuple2(3, "tom"));
    
   val scoreList = Array(
        Tuple2(1, 100),
        Tuple2(2, 90),
        Tuple2(3, 60));
    
    val students = sc.parallelize(studentList);
    val scores = sc.parallelize(scoreList);
    
    val studentScores = students.join(scores)  
    
    studentScores.foreach(studentScore => { 
      println("student id: " + studentScore._1);
      println("student name: " + studentScore._2._1)
      println("student socre: " + studentScore._2._2)  
      println("=======================================")  
    })  
  }

8.Cogroup
/**
     * cogroup案例:打印学生成绩
     */
    private static void cogroup() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("cogroup")  
                .setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);        
        // 模拟集合
        List<Tuple2<Integer, String>> studentList = Arrays.asList(
                new Tuple2<Integer, String>(1, "leo"),
                new Tuple2<Integer, String>(2, "jack"),
                new Tuple2<Integer, String>(3, "tom"));
        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 100),
                new Tuple2<Integer, Integer>(2, 90),
                new Tuple2<Integer, Integer>(3, 60),
                new Tuple2<Integer, Integer>(1, 70),
                new Tuple2<Integer, Integer>(2, 80),
                new Tuple2<Integer, Integer>(3, 50));
        
        // 并行化两个RDD
        JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
        JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);    
        // cogroup与join不同
        // 相当于是,一个key join上的所有value,都给放到一个Iterable里面去了 
        // cogroup,不太好讲解,希望大家通过动手编写我们的案例,仔细体会其中的奥妙
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = 
                students.cogroup(scores);    
        // 打印studnetScores RDD
        studentScores.foreach(            
                new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public void call(
                            Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
                            throws Exception {
                        System.out.println("student id: " + t._1);  
                        System.out.println("student name: " + t._2._1);  
                        System.out.println("student score: " + t._2._2);
                        System.out.println("===============================");   
                    }        
                });        
        // 关闭JavaSparkContext
        sc.close();
    }

Scala
 def join() {
    val conf = new SparkConf()
        .setAppName("join")  
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
   val studentList = Array(
        Tuple2(1, "leo"),
        Tuple2(2, "jack"),
        Tuple2(3, "tom"));
    
   val scoreList = Array(
        Tuple2(1, 100),
        Tuple2(2, 90),
        Tuple2(3, 60));
    
    val students = sc.parallelize(studentList);
    val scores = sc.parallelize(scoreList);
    
    val studentScores = students.join(scores)  
    
    studentScores.foreach(studentScore => { 
      println("student id: " + studentScore._1);
      println("student name: " + studentScore._2._1)
      println("student socre: " + studentScore._2._2)  
      println("=======================================")  
    })  
  }

3.常见的action操作
操作    介绍
reduce    将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。
collect    将RDD中所有元素获取到本地客户端。
count    获取RDD元素总数。
take(n)    获取RDD中前n个元素。
saveAsTextFile    将RDD元素保存到文件中,对每个元素调用toString方法
countByKey    对每个key对应的值进行count计数。
foreach    遍历RDD中的每个元素。

1.Reduce
Java
private static void reduce() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("reduce")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 使用reduce操作对集合中的数字进行累加
        // reduce操作的原理:
            // 首先将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果,比如1 + 2 = 3
            // 接着将该结果与下一个元素传入call()方法,进行计算,比如3 + 3 = 6
            // 以此类推
        // 所以reduce操作的本质,就是聚合,将多个元素聚合成一个元素
        int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
            
        });
        
        System.out.println(sum);  
        
        // 关闭JavaSparkContext
        sc.close();
    }
    

Scala
  def reduce() {
    val conf = new SparkConf()
        .setAppName("reduce")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numbers = sc.parallelize(numberArray, 1)  
    val sum = numbers.reduce(_ + _)  
    
    println(sum)  
  }
  

2.Collect
Java

    private static void collect() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("collect")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 使用map操作将集合中所有数字乘以2
        JavaRDD<Integer> doubleNumbers = numbers.map(
                
                new Function<Integer, Integer>() {

                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }
                    
                });
        
        // 不用foreach action操作,在远程集群上遍历rdd中的元素
        // 而使用collect操作,将分布在远程集群上的doubleNumbers RDD的数据拉取到本地
        // 这种方式,一般不建议使用,因为如果rdd中的数据量比较大的话,比如超过1万条
            // 那么性能会比较差,因为要从远程走大量的网络传输,将数据获取到本地
            // 此外,除了性能差,还可能在rdd中数据量特别大的情况下,发生oom异常,内存溢出
        // 因此,通常,还是推荐使用foreach action操作,来对最终的rdd元素进行处理
        List<Integer> doubleNumberList = doubleNumbers.collect();
        for(Integer num : doubleNumberList) {
            System.out.println(num);  
        }
        
        // 关闭JavaSparkContext
        sc.close();
    }
    

scala

  def collect() {
    val conf = new SparkConf()
        .setAppName("collect")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numbers = sc.parallelize(numberArray, 1)  
    val doubleNumbers = numbers.map { num => num * 2 }  
    
    val doubleNumberArray = doubleNumbers.collect()
    
    for(num <- doubleNumberArray) {
      println(num)  
    }
  }

3.Count
Java
private static void count() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("count")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 对rdd使用count操作,统计它有多少个元素
        long count = numbers.count();
        System.out.println(count);  
        
        // 关闭JavaSparkContext
        sc.close();
    }

scala
  def count() {
    val conf = new SparkConf()
        .setAppName("count")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numbers = sc.parallelize(numberArray, 1)  
    val count = numbers.count()
    
    println(count)  
  }

4.Take(n)
Java

    private static void take() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("take")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 对rdd使用count操作,统计它有多少个元素
        // take操作,与collect类似,也是从远程集群上,获取rdd的数据
        // 但是collect是获取rdd的所有数据,take只是获取前n个数据
        List<Integer> top3Numbers = numbers.take(3);
        
        for(Integer num : top3Numbers) {
            System.out.println(num);  
        }
        
        // 关闭JavaSparkContext
        sc.close();
    }

scala
  
  def take() {
    val conf = new SparkConf()
        .setAppName("take")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numbers = sc.parallelize(numberArray, 1)  
    
    val top3Numbers = numbers.take(3)
    
    for(num <- top3Numbers) {
      println(num)  
    }
  }

5.saveAsTextFile
Java
private static void saveAsTextFile() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("saveAsTextFile");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 使用map操作将集合中所有数字乘以2
        JavaRDD<Integer> doubleNumbers = numbers.map(
                
                new Function<Integer, Integer>() {

                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }
                    
                });
        
        // 直接将rdd中的数据,保存在HFDS文件中
        // 但是要注意,我们这里只能指定文件夹,也就是目录
        // 那么实际上,会保存为目录中的/double_number.txt/part-00000文件
        doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");   
        
        // 关闭JavaSparkContext
        sc.close();
    }
    

scala
  
  def saveAsTextFile() {
    
  }
  
 

6.countByKey
Java
    private static void countByKey() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("countByKey")  
                .setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 模拟集合
        List<Tuple2<String, String>> scoreList = Arrays.asList(
                new Tuple2<String, String>("class1", "leo"),
                new Tuple2<String, String>("class2", "jack"),
                new Tuple2<String, String>("class1", "marry"),
                new Tuple2<String, String>("class2", "tom"),
                new Tuple2<String, String>("class2", "david"));  
        
        // 并行化集合,创建JavaPairRDD
        JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);
        
        // 对rdd应用countByKey操作,统计每个班级的学生人数,也就是统计每个key对应的元素个数
        // 这就是countByKey的作用
        // countByKey返回的类型,直接就是Map<String, Object>
        Map<String, Object> studentCounts = students.countByKey();
        
        for(Map.Entry<String, Object> studentCount : studentCounts.entrySet()) {
            System.out.println(studentCount.getKey() + ": " + studentCount.getValue());  
        }
        
        // 关闭JavaSparkContext
        sc.close();
    }

scala
 def countByKey() {
    val conf = new SparkConf()
        .setAppName("countByKey")  
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val studentList = Array(Tuple2("class1", "leo"), Tuple2("class2", "jack"),
        Tuple2("class1", "tom"), Tuple2("class2", "jen"), Tuple2("class2", "marry"))   
    val students = sc.parallelize(studentList, 1)  
    val studentCounts = students.countByKey()  
    
    println(studentCounts)  
  }
}

7.Foreach
六、RDD持久化
1.持久化原理
     Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。

巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。

要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。

cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清楚缓存,那么可以使用unpersist()方法。

Spark自己也会在shuffle操作时,进行数据的持久化,比如写入磁盘,主要是为了在节点失败时,避免需要重新计算整个过程。

实验检测
public class Persist {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Persist")
                .setMaster("local"); 
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // cache()或者persist()的使用,是有规则的
        // 必须在transformation或者textFile等创建了一个RDD之后,直接连续调用cache()或persist()才可以
        // 如果你先创建一个RDD,然后单独另起一行执行cache()或persist()方法,是没有用的
        // 而且,会报错,大量的文件会丢失
        JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt")
                .cache();
        
        long beginTime = System.currentTimeMillis();
        
        long count = lines.count();
        System.out.println(count);  
        
        long endTime = System.currentTimeMillis();
        System.out.println("cost " + (endTime - beginTime) + " milliseconds.");   
        
        beginTime = System.currentTimeMillis();
        
        count = lines.count();
        System.out.println(count);  
        
        endTime = System.currentTimeMillis();
        System.out.println("cost " + (endTime - beginTime) + " milliseconds.");
        
        sc.close();
    }
    
}

2.持久化策略
RDD持久化是可以手动选择不同的策略的。比如可以将RDD持久化在内存中、持久化到磁盘上、使用序列化的方式持久化,多持久化的数据进行多路复用。只要在调用persist()时传入对应的StorageLevel即可。


持久化级别    含义
MEMORY_ONLY    以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算。
MEMORY_AND_DISK    同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取。
MEMORY_ONLY_SER    同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。
MEMORY_AND_DSK_SER    同MEMORY_AND_DSK。但是使用序列化方式持久化Java对象。
DISK_ONLY    使用非序列化Java对象的方式持久化,完全存储到磁盘上。
MEMORY_ONLY_2
MEMORY_AND_DISK_2
等等    如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。

3.如何选择持久化策略
Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:

1、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作。
2、如果MEMORY_ONLY策略,无法存储的下所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗CPU进行反序列化。
3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
4、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。
七、共享变量
1.共享变量的原理
Spark一个非常重要的特性就是共享变量。

默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。

Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量)。Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。

2. Broadcast Variable
Spark提供的Broadcast Variable,是只读的。并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本。因此其最大作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗。此外,spark自己内部也使用了高效的广播算法来减少网络消耗。

可以通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了。每个节点可以使用广播变量的value()方法获取值。记住,广播变量,是只读的。

val factor = 3
val factorBroadcast = sc.broadcast(factor)

val arr = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(arr)
val multipleRdd = rdd.map(num => num * factorBroadcast.value())

multipleRdd.foreach(num => println(num))

3.Accumulator
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

val sumAccumulator = sc.accumulator(0)
val arr = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(arr)
rdd.foreach(num => sumAccumulator += num)

println(sumAccumulator.value)

八、基于排序机制的wordcount程序
 案例需求
1、对文本文件内的每个单词都统计出其出现的次数。
2、按照每个单词出现次数的数量,降序排序。
1.Java代码
import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * 排序的wordcount程序
 * @author Administrator
 *
 */
public class SortWordCount {

    public static void main(String[] args) {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("SortWordCount")
                .setMaster("local"); 
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 创建lines RDD
        JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");
        
        // 执行我们之前做过的单词计数
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String t) throws Exception {
                return Arrays.asList(t.split(" "));  
            }
            
        });        
        JavaPairRDD<String, Integer> pairs = words.mapToPair(
                
                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String t) throws Exception {
                        return new Tuple2<String, Integer>(t, 1);
                    }
                    
                });    
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
                
                new Function2<Integer, Integer, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }                
                });    
        // 到这里为止,就得到了每个单词出现的次数
        // 但是,问题是,我们的新需求,是要按照每个单词出现次数的顺序,降序排序
        // wordCounts RDD内的元素是什么?应该是这种格式的吧:(hello, 3) (you, 2)
        // 我们需要将RDD转换成(3, hello) (2, you)的这种格式,才能根据单词出现次数进行排序把!
        
        // 进行key-value的反转映射
        JavaPairRDD<Integer, String> countWords = wordCounts.mapToPair(            
                new PairFunction<Tuple2<String,Integer>, Integer, String>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
                            throws Exception {
                        return new Tuple2<Integer, String>(t._2, t._1);
                    }                
                });    
        // 按照key进行排序
        JavaPairRDD<Integer, String> sortedCountWords = countWords.sortByKey(false);    
        // 再次将value-key进行反转映射
        JavaPairRDD<String, Integer> sortedWordCounts = sortedCountWords.mapToPair(                
                new PairFunction<Tuple2<Integer,String>, String, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
                            throws Exception {
                        return new Tuple2<String, Integer>(t._2, t._1);
                    }                
                });    
        // 到此为止,我们获得了按照单词出现次数排序后的单词计数
        // 打印出来
        sortedWordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {        
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._1 + " appears " + t._2 + " times.");      
            }        
        });    
        // 关闭JavaSparkContext
        sc.close();
    }
}

2.scala代码
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
 * @author Administrator
 */
object SortWordCount { 
  def main(args: Array[String]) {
    val conf = new SparkConf()
        .setAppName("SortWordCount")
        .setMaster("local") 
    val sc = new SparkContext(conf)
    
    val lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt", 1)
    val words = lines.flatMap { line => line.split(" ") }  
    val pairs = words.map { word => (word, 1) }  
    val wordCounts = pairs.reduceByKey(_ + _)  
    
    val countWords = wordCounts.map(wordCount => (wordCount._2, wordCount._1))   
    val sortedCountWords = countWords.sortByKey(false)  
    val sortedWordCounts = sortedCountWords.map(sortedCountWord => (sortedCountWord._2, sortedCountWord._1))  
    
    sortedWordCounts.foreach(sortedWordCount => println(
        sortedWordCount._1 + " appear " + sortedWordCount._2 + " times."))
  }
  
}

九、二次排序
 案例需求
1、按照文件中的第一列排序。
2、如果第一列相同,则按照第二列排序。
1.java代码
package cn.spark.study.core;

import java.io.Serializable;

import scala.math.Ordered;

/**
 * 自定义的二次排序key
 * @author Administrator
 *
 */
public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable {

    private static final long serialVersionUID = -2366006422945129991L;
    
    // 首先在自定义key里面,定义需要进行排序的列
    private int first;
    private int second;
    
    public SecondarySortKey(int first, int second) {
        this.first = first;
        this.second = second;
    }

    @Override
    public boolean $greater(SecondarySortKey other) {
        if(this.first > other.getFirst()) {
            return true;
        } else if(this.first == other.getFirst() && 
                this.second > other.getSecond()) {
            return true;
        }
        return false;
    }
    
    @Override
    public boolean $greater$eq(SecondarySortKey other) {
        if(this.$greater(other)) {
            return true;
        } else if(this.first == other.getFirst() && 
                this.second == other.getSecond()) {
            return true;
        }
        return false;
    }

    @Override
    public boolean $less(SecondarySortKey other) {
        if(this.first < other.getFirst()) {
            return true;
        } else if(this.first == other.getFirst() && 
                this.second < other.getSecond()) {
            return true;
        }
        return false;
    }
    
    @Override
    public boolean $less$eq(SecondarySortKey other) {
        if(this.$less(other)) {
            return true;
        } else if(this.first == other.getFirst() && 
                this.second == other.getSecond()) {
            return true;
        }
        return false;
    }
    
    @Override
    public int compare(SecondarySortKey other) {
        if(this.first - other.getFirst() != 0) {
            return this.first - other.getFirst();
        } else {
            return this.second - other.getSecond();
        }
    }
    
    @Override
    public int compareTo(SecondarySortKey other) {
        if(this.first - other.getFirst() != 0) {
            return this.first - other.getFirst();
        } else {
            return this.second - other.getSecond();
        }
    }
    
    // 为要进行排序的多个列,提供getter和setter方法,以及hashcode和equals方法
    public int getFirst() {
        return first;
    }

    public void setFirst(int first) {
        this.first = first;
    }

    public int getSecond() {
        return second;
    }

    public void setSecond(int second) {
        this.second = second;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + first;
        result = prime * result + second;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        SecondarySortKey other = (SecondarySortKey) obj;
        if (first != other.first)
            return false;
        if (second != other.second)
            return false;
        return true;
    }
    
}
package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * 二次排序
 * 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法
 * 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD
 * 3、使用sortByKey算子按照自定义的key进行排序
 * 4、再次映射,剔除自定义的key,只保留文本行
 * @author Administrator
 *
 */
public class SecondarySort {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("SecondarySort") 
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
    
        JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt");    
        JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair(
                
                new PairFunction<String, SecondarySortKey, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<SecondarySortKey, String> call(String line) throws Exception {
                        String[] lineSplited = line.split(" ");  
                        SecondarySortKey key = new SecondarySortKey(
                                Integer.valueOf(lineSplited[0]), 
                                Integer.valueOf(lineSplited[1]));  
                        return new Tuple2<SecondarySortKey, String>(key, line);
                    }                
                });    
        JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey();    
        JavaRDD<String> sortedLines = sortedPairs.map(            
                new Function<Tuple2<SecondarySortKey,String>, String>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public String call(Tuple2<SecondarySortKey, String> v1) throws Exception {
                        return v1._2;
                    }            
                });    
        sortedLines.foreach(new VoidFunction<String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(String t) throws Exception {
                System.out.println(t);  
            }
        });
        sc.close();
    }
}


2.Scala代码
class SecondSortKey(val first: Int, val second: Int) 
    extends Ordered[SecondSortKey] with Serializable {
  
  def compare(that: SecondSortKey): Int = {
    if(this.first - that.first != 0) {
      this.first - that.first
    } else {
      this.second - that.second
    }
  }
  
}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * @author Administrator
 */
object SecondSort {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setAppName("SecondSort")  
        .setMaster("local")  
    val sc = new SparkContext(conf)
  
    val lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt", 1)
    val pairs = lines.map { line => (
        new SecondSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),
        line)}
    val sortedPairs = pairs.sortByKey()
    val sortedLines = sortedPairs.map(sortedPair => sortedPair._2)  
    
    sortedLines.foreach { sortedLine => println(sortedLine) }  
  }
  
}

十、Topn
 案例需求
1、对文本文件内的数字,取最大的前3个。
2、对每个班级内的学生成绩,取出前3名。(分组取topn)
3、课后作用:用Scala来实现分组取topn。

1.java代码
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

/**
 * 取最大的前3个数字
 * @author Administrator
 *
 */
public class Top3 {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Top3")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//top.txt");    
        JavaPairRDD<Integer, String> pairs = lines.mapToPair(    
                new PairFunction<String, Integer, String>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Tuple2<Integer, String> call(String t) throws Exception {
                        return new Tuple2<Integer, String>(Integer.valueOf(t), t);
                    }
                });
        JavaPairRDD<Integer, String> sortedPairs = pairs.sortByKey(false);
        JavaRDD<Integer> sortedNumbers = sortedPairs.map(    
                new Function<Tuple2<Integer,String>, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Tuple2<Integer, String> v1) throws Exception {
                        return v1._1;
                    }
                    
                });
        List<Integer> sortedNumberList = sortedNumbers.take(3);
        for(Integer num : sortedNumberList) {
            System.out.println(num);
        }    
        sc.close();
    }    
}

2.
import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * 分组取top3
 * @author Administrator
 *
 */
public class GroupTop3 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Top3")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//score.txt");
        JavaPairRDD<String, Integer> pairs = lines.mapToPair(
                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String line) throws Exception {
                        String[] lineSplited = line.split(" ");  
                        return new Tuple2<String, Integer>(lineSplited[0], 
                                Integer.valueOf(lineSplited[1]));
                    }
                });
        JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();
        
        JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair(
                
        new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {

                    private static final long serialVersionUID = 1L;
                    @Override
                    public Tuple2<String, Iterable<Integer>> call(
                            Tuple2<String, Iterable<Integer>> classScores)
                            throws Exception {
                        Integer[] top3 = new Integer[3];
                        String className = classScores._1;
                        Iterator<Integer> scores = classScores._2.iterator();
                        while(scores.hasNext()) {
                            Integer score = scores.next();
                            
                            for(int i = 0; i < 3; i++) {
                                if(top3[i] == null) {
                                    top3[i] = score;
                                    break;
                                } else if(score > top3[i]) {
                                    for(int j = 2; j > i; j--) {
                                        top3[j] = top3[j - 1];  
                                    }
                                    
                                    top3[i] = score;
                                    
                                    break;
                                } 
                            }
                        }
                        return new Tuple2<String, 
                                Iterable<Integer>>(className, Arrays.asList(top3));    
                    }
                });
        top3Score.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
                System.out.println("class: " + t._1);  
                Iterator<Integer> scoreIterator = t._2.iterator();
                while(scoreIterator.hasNext()) {
                    Integer score = scoreIterator.next();
                    System.out.println(score);  
                }
                System.out.println("=======================================");   
            }
        });
        sc.close();
    }
}

2.Scala代码
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * @author Administrator
 */
object Top3 {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setAppName("Top3")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val lines = sc.textFile("C://Users//Administrator//Desktop//top.txt", 1)
    val pairs = lines.map { line => (line.toInt, line) }
    val sortedPairs = pairs.sortByKey(false)
    val sortedNumbers = sortedPairs.map(sortedPair => sortedPair._1)  
    val top3Number = sortedNumbers.take(3)
    
    for(num <- top3Number) {
      println(num)  
    }
  }
  
}


十一、spark内核架构

1.提交spark应用的机器行通过spark-submit(shell)提交自己的spark程序,称之为Application,提交应用程序后,会启动driver进程,spark-submit使用我们之前一直使用的那种提交模式去提交的时候,我们之前的提交模式就做standalone,其实会通过反射的方式。创建和构造一个DriverActor进程出来

2.执行我们的application应用程序,也就是我们自己编写的代码,代码首先是构造sparkConf、在构造sparkContext

3.sparkContext在初始化的时候,做的最重要的两件事就是构造出来DAGScheduler和taskScheduler。

4.taskScheduler(他有自己的后台进行)实际上,是负责通过他对对应的一个后台进程去连接master,向master注册application

5.master接收到application注册的请求之后,会使用自己的资源调度算法,在spark集群的worker节点上,为这个application启动多个executor。Master通知worker启动executor

6.executor启动之后,会自己反向注册到taskscheduler。每执行到一个action,就会创建一个job,job会提交给DAGScheduler,DAGScheduler会将job划分成多个stage,然后每一个stage创建一个taskSet,

7.taskscheduler会吧taskSet里每一个task,提交到executor执行(task分配算法)


8.worker会为application启动executor(进程),executor中有线程池。Executor,每接收到一个task,都会用taskRunner来封装task,然后从线程池里取出一个线程,执行这个task.

9.taskRunner:将我们编写的代码,也就是要执行的算子以及函数,拷贝,反序列化然后进行task

10.task有两种,shuffleMapTask和ResultTask.只有最后一个stage是ResultTask,之前的stage,都是shuffleMapTask.

11.所以。最后整个spark应用程序的执行,就是stage分批次作为taskset提交到executor执行,每一个task针对的RDD的一个partition,执行我们定义的算子和函数,以此类推 ,直到所有的操作执行完为止

十二、宽依赖和窄依赖
1、Application
2、spark-submit
3、Driver
4、SparkContext
5、Master
6、Worker
7、Executor
8、Job
9、DAGScheduler
10、TaskScheduler
11、ShuffleMapTask and ResultTask



1.窄依赖
Narrow Dependency
一个RDD,对他的父RDD,只有简单的一对一的依赖关系,也就是说,RDD的每个partition,仅依赖于父的一个partition。父的RDD和子的RDD的partition之间的对应关系是一对一的。这种情况下,是简单的RDD之间的依赖关系


2.宽依赖
Shuffle Dependecy,本质如其名,本质其实就是shuffle。也就是说。每一个父RDD的partition中的数据,都可能会传输一部分。到下一个RDD的每一个partition中,此时就会出现,父RDD和子RDD的partition之间,具有交互错综负责的关系。那么,这种情况,就叫做两个RDD之间的宽依赖。同时,他们之间的操作就是shuffle。

十三、基于YARN两种提交模式
1、Spark内核架构,其实就是第一种模式,standalone模式,基于Spark自己的Master-Worker集群。
2、第二种,是基于YARN的yarn-cluster模式。
3、第三种,是基于YARN的yarn-client模式。
4、如果,你要切换到第二种和第三种模式,很简单,将我们之前用于提交spark应用程序的spark-submit脚本,加上--master参数,设置为yarn-cluster,或yarn-client,即可。如果你没有设置,那么,就是standalone模式。



1.Yarn-cluster提交模式
Spark-submit提交,发送请求到resourcemanager,请求启动applicationmaster,resourcemanager分配一个containner,在某个nodemanager上启动一个applicationmasstesr,applicationmaster相当于是driver,applicationmaster连接其他的nodemanager来启动Executor,这里的nodemanager相当于是worker,executor启动后,向application反向注册

2.Yarn-clinet提交模式
Spark-submit提交,在本地启动driver进程(主要与yarn-cluster进程不同的地方,yarn-cluster是在nodemanager上启动),发送请求给resourcemanager,请求启动applicationmaster,resourcemanager分配一个container,在某个nodemanager上启动一个applicationmaster,但是这里的applicationmaster 其实只是一个executorLauncher。在nodemanager上启动executorLanucher(applicationmaser) ,resourcemanager分配一批container,然后applicationmaster连接其他的nodemanager,用container的资源,启动executor,executor反向注册到本地的driver上

3.选用原则
1.yarn-clinet用于测试,因为,driver运行在本地客户端,负责调整application,会与yarn集群产生大量的网络通信,从而导致网卡流量激增,可能会被你们公司的SA(运维)警告。好处是直接执行,本地可以看到所有的log。方便进行调试

2.Yarn-cluster用于生产环境,因为driver运行在nodemanager,没有忘啦流量激增的问题。缺点在于,调试不方便,本地应spark-submint提交后,看不大log,只能通过yarn application-logs application-id 这种命令查看,很是麻烦
十四、SparkContext原理剖析与源码分析
1、TaskScheduler
2、DAGScheduler
3、SparkUI



十五、








Spark性能优化
1.概述
由于Spark的计算本质是基于内存的,所以Spark性能程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者是内存。如果内存能够容纳得下所有的数据,那么网络传输和通信就会导致性能出现瓶颈。但是如果内存比较紧张,不足以放下所有的数据(比如在针对10亿以上的数据量进行计算时),还是需要对内存的使用进行性能优化的,比如说使用一些手段来减少内存的消耗。

Spark性能优化,其实主要就是在于对内存的使用进行调优。因为通常情况下来说,如果你的Spark应用程序计算的数据量比较小,并且你的内存足够使用,那么只要运维可以保障网络通常,一般是不会有大的性能问题的。但是Spark应用程序的性能问题往往出现在针对大数据量(比如10亿级别)进行计算时出现,因此通常来说,Spark性能优化,主要是对内存进行性能优化。当然,除了内存调优之外,还有很多手段可以优化Spark应用程序的性能。

2.spark性能调优的重要性
1.实际上Spark到目前为止,在大数据业界的影响力和覆盖度,还远没有达到Hadoop的水平,——虽然说,我们之前一再强调,Spark Core、Spark SQL、Spark Streaming,可以替代MapReduce、Hive查询引擎、Storm。但是事实就是,Spark还没有达到已经替代了它们的地步。

根据我在研究Spark,并且在一线使用Spark,与大量行业内的大数据相关从业人员沟通的情况来看。Spark最大的优点,其实也是它目前最大的问题——基于内存的计算模型。Spark由于使用了基于内存的计算模型,因此导致了其稳定性,远远不如Hadoop。虽然我也很喜欢和热爱Spark,但是这就是事实,Spark的速度的确达到了hadoop的几倍、几十倍、甚至上百倍(极端情况)。但是基于内存的模型,导致它经常出现各种OOM(内存溢出)、内部异常等问题。

说一个亲身经历的例子,曾经用Spark改写几个复杂的MapReduce程序,虽然MapReduce很慢,但是它很稳定,至少慢慢跑,是可以跑出来数据的。但是用Spark Core很快就改写完了程序,问题是,在整整半个月之内,Spark程序根本跑不起来,因为数据量太大,10亿+。导致它出现了各种各样的问题,包括OOM、文件丢失、task lost、内部异常等等各种问题。最后耗费了大量时间,最一个spark程序进行了大量的性能调优,才最终让它可以跑起来。

的确,用了Spark,比MapReduce的速度快了十倍,但是付出的代价是惨痛的,花了整整一个月的时间做这个事情。

2.因此,当我在公司推广Spark的使用时,很多人都不无担心地说,听说Spark还不够稳定,经常出现问题,比如OOM等,它的稳定性,导致业界的人们不太敢轻易尝试它,在复杂的大数据系统,要求极高稳定性的线程系统中使用。——当然,如果你就是开发一个针对公司内部的,稳定性要求不高的系统,当然不用担心这个问题。

所以,我认为,Spark的基于内存的本质,就导致了上述的问题,导致了它目前还无法完全提到Hadoop中的某些技术。

但是,纵然Spark有各种问题,其优点就是缺点,缺点也是优点——它实在是很快。优秀的Spark应用程序,性能完全可以达到MapReduce、Hive查询引擎的数倍、甚至数十倍。因此,纵使有各种担忧,Spark还是吸引着大量的人们以及公司去探索,和尝试攻克它,使用它,让它为我们所用,用它开放更棒的大数据系统。

因此,正是基于上述背景,Spark工程师的要求是非常高的。比如我们这里,我们正在用Spark开发大型复杂的线上大数据系统,所以针对Spark的招聘,我们是要求Spark工程师必须精通Spark内核源码,能够对程序进行性能优化。——打个广告,实际上,我认为如果能精通本系列课程,那么成为一个行业内优秀的Spark工程师,是一定没有问题的。

3.所以,Spark虽然有它的问题所在,但是它的优势还是让它以极快的速度,极强的劲头在行业内快速发展。行业内各个公司,也大量缺乏着优秀的Spark工程师。而如果是想转型进行Spark开发的朋友,基于上述种种背景,就应该明白了,Spark性能优化,对于你找工作,对于你在实际工作中解决问题的重要性了!

要成为优秀的Spark工程师,顺利实现转型,那么就必须能够彻底精通Spark内核源码,能够基于对Spark内核原理的深度理解,对线上复杂的Spark大数据系统 / 程序出现的报错和故障,进行排查和解决;能够对运行较慢的Spark应用程序,进行精准的性能问题排查,并且对症下游,针对各种性能问题,使用对应的技术手段,进行解决。

只有这样,我认为,你才能够顺利实现转型,出去成功面试Spark工程师,甚至是高级Spark工程师的岗位。才能在实际工作中,真正让Spark发挥出其巨大的威力。而不仅仅是处于对新技术的喜爱,对Spark进行浅尝辄止的学习——那是没有任何用的。

不精通Spark内核源码,不精通Spark性能优化,也许你能找到Spark大数据的工作,但是通常情况下,也只能进入比较缺人的小公司。要进入大公司,找到更好的职业机会,那么就一起在精通了之前的Spark内核源码深度剖析阶段之后,来进入Spark性能优化阶段的学习吧。
3.性能优化的技术

诊断内存的消耗

内存都花费在哪里了?

1、每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上比对象自己还要大。

2、Java的String对象,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息。而且因为String使用的是UTF-16编码,所以每个字符会占用2个字节。比如,包含10个字符的String,会占用60个字节。

3、Java中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,通常占用8个字节。

4、元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如Integer,来存储元素。

如何判断你的程序消耗了多少内存?
这里有一个非常简单的办法来判断,你的spark程序消耗了多少内存。

1、首先,自己设置RDD的并行度,有两种方式:要不然,在parallelize()、textFile()等方法中,传入第二个参数,设置RDD的task / partition的数量;要不然,用SparkConf.set()方法,设置一个参数,spark.default.parallelism,可以统一设置这个application所有RDD的partition数量。

2、其次,在程序中将RDD cache到内存中,调用RDD.cache()方法即可。

3、最后,观察Driver的log,你会发现类似于:“INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息。这就显示了每个partition占用了多少内存。

4、将这个内存信息乘以partition数量,即可得出RDD的内存占用量。

Spark的性能优化,主要手段包括:
1、使用高性能序列化类库
2、优化数据结构
3、对多次使用的RDD进行持久化 / Checkpoint
4、使用序列化的持久化级别
5、Java虚拟机垃圾回收调优
6、提高并行度
7、广播共享数据
8、数据本地化
9、reduceByKey和groupByKey的合理使用
10、Shuffle调优(核心中的核心,重中之重)

1、使用高性能序列化类库
数据序列化概述:
在任何分布式系统中,序列化都是扮演着一个重要的角色的。如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。

Spark自身默认就会在一些地方对数据进行序列化,比如Shuffle。还有就是,如果我们的算子函数使用到了外部的数据(比如Java内置类型,或者自定义类型),那么也需要让其可序列化。

而Spark自身对于序列化的便捷性和性能进行了一个取舍和权衡。默认,Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制——基于ObjectInputStream和ObjectOutputStream的序列化机制。因为这种方式是Java原生提供的,很方便使用。

但是问题是,Java序列化机制的性能并不高。序列化的速度相对较慢,而且序列化以后的数据,还是相对来说比较大,还是比较占用内存空间。因此,如果你的Spark应用程序对内存很敏感,那么,实际上默认的Java序列化机制并不是最好的选择。

Spark提供了两种序列化的机制:

Spark实际上提供了两种序列化机制,它只是默认使用了第一种:

1、Java序列化机制:默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的。而且Java序列化机制是提供了自定义序列化支持的,只要你实现Externalizable接口即可实现自己的更高性能的序列化算法。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大。

2、Kryo序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之所以不是默认序列化机制的原因是,有些类型虽然实现了Seriralizable接口,但是它也不一定能够进行序列化;此外,如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。

 如何使用Kryo序列化机制(一)
如果要使用Kryo序列化机制,首先要用SparkConf设置一个参数,使用new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")即可,即将Spark的序列化器设置为KryoSerializer。这样,Spark在内部的一些操作,比如Shuffle,进行序列化时,就会使用Kryo类库进行高性能、快速、更低内存占用量的序列化了。

使用Kryo时,它要求是需要序列化的类,是要预先进行注册的,以获得最佳性能——如果不注册的话,那么Kryo必须时刻保存类型的全限定名,反而占用不少内存。Spark默认是对Scala中常用的类型自动注册了Kryo的,都在AllScalaRegistry类中。

但是,比如自己的算子中,使用了外部的自定义类型的对象,那么还是需要将其进行注册。

(实际上,下面的写法是错误的,因为counter不是共享的,所以累加的功能是无法实现的)
val counter = new Counter();
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
numbers.foreach(num => counter.add(num));


如何使用Kryo序列化机制(二)

如果要注册自定义的类型,那么就使用如下的代码,即可:

Scala版本:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[Counter] ))
val sc = new SparkContext(conf)

Java版本:
SparkConf conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Counter.class)
JavaSparkContext sc = new JavaSparkContext(conf)


 优化Kryo类库的使用

1、优化缓存大小
如果注册的要序列化的自定义的类型,本身特别大,比如包含了超过100个field。那么就会导致要序列化的对象过大。此时就需要对Kryo本身进行优化。因为Kryo内部的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。

默认情况下它的值是2,就是说最大能缓存2M的对象,然后进行序列化。可以在必要时将其调大。比如设置为10。

2、预先注册自定义类型
虽然不注册自定义类型,Kryo类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存。因此通常都建议预先注册号要序列化的自定义的类。

 在什么场景下使用Kryo序列化类库?
首先,这里讨论的都是Spark的一些普通的场景,一些特殊的场景,比如RDD的持久化,在后面会讲解。这里先不说。

那么,这里针对的Kryo序列化类库的使用场景,就是算子函数使用到了外部的大数据的情况。比如说吧,我们在外部定义了一个封装了应用所有配置的对象,比如自定义了一个MyConfiguration对象,里面包含了100m的数据。然后,在算子函数里面,使用到了这个外部的大对象。

此时呢,如果默认情况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会导致,序列化速度缓慢,并且序列化以后的数据还是比较大,比较占用内存空间。

因此,在这种情况下,比较适合,切换到Kryo序列化类库,来对外部的大对象进行序列化操作。一是,序列化速度会变快;二是,会减少序列化后的数据占用的内存空间。

2.优化数据结构
要减少内存的消耗,除了使用高效的序列化类库以外,还有一个很重要的事情,就是优化数据结构。从而避免Java语法特性中所导致的额外内存的开销,比如基于指针的Java数据结构,以及包装类型。

有一个关键的问题,就是优化什么数据结构?其实主要就是优化你的算子函数,内部使用到的局部数据,或者是算子函数外部的数据。都可以进行数据结构的优化。优化之后,都会减少其对内存的消耗和占用。

 如何优化数据结构(一)

1、优先使用数组以及字符串,而不是集合类。也就是说,优先用array,而不是ArrayList、LinkedList、HashMap等集合。

比如,有个List<Integer> list = new ArrayList<Integer>(),将其替换为int[] arr = new int[]。这样的话,array既比List少了额外信息的存储开销,还能使用原始数据类型(int)来存储数据,比List中用Integer这种包装类型存储数据,要节省内存的多。

还比如,通常企业级应用中的做法是,对于HashMap、List这种数据,统一用String拼接成特殊格式的字符串,比如Map<Integer, Person> persons = new HashMap<Integer, Person>()。可以优化为,特殊的字符串格式:id:name,address|id:name,address...。


2、避免使用多层嵌套的对象结构。比如说,public class Teacher { private List<Student> students = new ArrayList<Student>() }。就是非常不好的例子。因为Teacher类的内部又嵌套了大量的小Student对象。

比如说,对于上述例子,也完全可以使用特殊的字符串来进行数据的存储。比如,用json字符串来存储数据,就是一个很好的选择。

{"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]}

3、对于有些能够避免的场景,尽量使用int替代String。因为String虽然比ArrayList、HashMap等数据结构高效多了,占用内存量少多了,但是之前分析过,还是有额外信息的消耗。比如之前用String表示id,那么现在完全可以用数字类型的int,来进行替代。

这里提醒,在spark应用中,id就不要用常用的uuid了,因为无法转成int,就用自增的int类型的id即可。(sdfsdfdf-234242342-sdfsfsfdfd)


3.对多次使用的RDD进行持久化或Checkpoint
如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作。那么就非常有必要对其进行持久化操作,以避免对一个RDD反复进行计算。

此外,如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作。
4.使用序列化的持久化级别
除了对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能。因为很有可能,RDD的数据是持久化到内存,或者磁盘中的。那么,此时,如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等。使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法即可。

这样的话,将数据序列化之后,再持久化,可以大大减小对内存的消耗。此外,数据量小了之后,如果要写入磁盘,那么磁盘io性能消耗也比较小。

对RDD持久化序列化后,RDD的每个partition的数据,都是序列化为一个巨大的字节数组。这样,对于内存的消耗就小的多了。但是唯一的缺点就是,获取RDD数据时,需要对其进行反序列化,会增大其性能开销。

因此,对于序列化的持久化级别,还可以进一步优化,也就是说,使用Kryo序列化类库,这样,可以获得更快的序列化速度,并且占用更小的内存空间。但是要记住,如果RDD的元素(RDD<T>的泛型类型),是自定义类型的话,在Kryo中提前注册自定义类型。

5.Java虚拟机垃圾回收调优
 Java虚拟机垃圾回收调优的背景
如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个性能瓶颈。因为Java虚拟机会定期进行垃圾回收,此时就会追踪所有的java对象,并且在垃圾回收时,找到那些已经不在使用的对象,然后清理旧的对象,来给新的对象腾出内存空间。

垃圾回收的性能开销,是跟内存中的对象的数量,成正比的。所以,对于垃圾回收的性能问题,首先要做的就是,使用更高效的数据结构,比如array和string;其次就是在持久化rdd时,使用序列化的持久化级别,而且用Kryo序列化类库,这样,每个partition就只是一个对象——一个字节数组。

监测垃圾回收
我们可以对垃圾回收进行监测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。只要在spark-submit脚本中,增加一个配置即可,--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"。

但是要记住,这里虽然会打印出Java虚拟机的垃圾回收的相关信息,但是是输出到了worker上的日志中,而不是driver的日志中。

但是这种方式也只是一种,其实也完全可以通过SparkUI(4040端口)来观察每个stage的垃圾回收的情况。

优化executor内存比例

对于垃圾回收来说,最重要的就是调节RDD缓存占用的内存空间,与算子执行时创建的对象占用的内存空间的比例。默认情况下,Spark使用每个executor 60%的内存空间来缓存RDD,那么在task执行期间创建的对象,只有40%的内存空间来存放。

在这种情况下,很有可能因为你的内存空间的不足,task创建的对象过大,那么一旦发现40%的内存空间不够用了,就会触发Java虚拟机的垃圾回收操作。因此在极端情况下,垃圾回收操作可能会被频繁触发。

在上述情况下,如果发现垃圾回收频繁发生。那么就需要对那个比例进行调优,使用new SparkConf().set("spark.storage.memoryFraction", "0.5")即可,可以将RDD缓存占用空间的比例降低,从而给更多的空间让task创建的对象进行使用。

因此,对于RDD持久化,完全可以使用Kryo序列化,加上降低其executor内存占比的方式,来减少其内存消耗。给task提供更多的内存,从而避免task的执行频繁触发垃圾回收。

高级垃圾回收调优(一)

    Java堆空间被划分成了两块空间,一个是年轻代,一个是老年代。年轻代放的是短时间存活的对象,老年代放的是长时间存活的对象。年轻代又被划分了三块空间,Eden、Survivor1、Survivor2。

首先,Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。创建的对象,首先放入Eden区域和Survivor1区域,如果Eden区域满了,那么就会触发一次Minor GC,进行年轻代的垃圾回收。Eden和Survivor1区域中存活的对象,会被移动到Survivor2区域中。然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。

如果一个对象,在年轻代中,撑过了多次垃圾回收,都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2放满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,进入老年代的问题。

如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作。

Spark中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能呆在年轻代。不能因为某个Survivor区域空间不够,在Minor GC时,就进入了老年代。从而造成短时间存活的对象,长期呆在老年代中占据了空间,而且Full GC时要回收大量的短时间存活的对象,导致Full GC速度缓慢。

如果发现,在task执行期间,大量full gc发生了,那么说明,年轻代的Eden区域,给的空间不够大。此时可以执行一些操作来优化垃圾回收行为:
1、包括降低spark.storage.memoryFraction的比例,给年轻代更多的空间,来存放短时间存活的对象;
2、给Eden区域分配更大的空间,使用-Xmn即可,通常建议给Eden区域,预计大小的4/33、如果使用的是HDFS文件,那么很好估计Eden区域大小,如果每个executor有4个task,然后每个hdfs压缩块解压缩后大小是3倍,此外每个hdfs块的大小是64M,那么Eden区域的预计大小就是:4 * 3 * 64MB,然后呢,再通过-Xmn参数,将Eden区域大小设置为4 * 3 * 64 * 4/3。

总结:
其实啊,根据经验来看,对于垃圾回收的调优,尽量就是说,调节executor内存的比例就可以了。因为jvm的调优是非常复杂和敏感的。除非是,真的到了万不得已的地方,然后呢,自己本身又对jvm相关的技术很了解,那么此时进行eden区域的调节,调优,是可以的。

一些高级的参数
-XX:SurvivorRatio=4:如果值为4,那么就是两个Survivor跟Eden的比例是2:4,也就是说每个Survivor占据的年轻代的比例是1/6,所以,你其实也可以尝试调大Survivor区域的大小。
-XX:NewRatio=4:调节新生代和老年代的比例

6.提高并行度

实际上Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源。才能充分提高Spark应用程序的性能。

Spark会自动设置以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度即可。

可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度;也可以使用spark.default.parallelism参数,来设置统一的并行度。Spark官方的推荐是,给集群中的每个cpu core设置2~3个task。

比如说,spark-submit设置了executor数量是10个,每个executor要求分配2个core,那么application总共会有20个core。此时可以设置new SparkConf().set("spark.default.parallelism", "60")来设置合理的并行度,从而充分利用资源。

7.广播共享数据
如果你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个task上去。而是给每个节点拷贝一份,然后节点上的task共享该数据。

这样的话,就可以减少大数据在节点上的内存消耗。并且可以减少数据到节点的网络传输消耗。


8.数据本地化
数据本地化背景
数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。

数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有几种数据本地化级别:
1、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。
2、NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的executor进程中,或者是数据在HDFS文件的block中。
3、NO_PREF:数据从哪里过来,性能都是一样的。
4、RACK_LOCAL:数据和计算它的代码在一个机架上。
5ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。
数据本地化优化
Spark倾向于使用最好的本地化级别来调度task,但是这是不可能的。如果没有任何未处理的数据在空闲的executor上,那么Spark就会放低本地化级别。这时有两个选择:第一,等待,直到executor上的cpu释放出来,那么就分配task过去;第二,立即在任意一个executor上启动一个task。

Spark默认会等待一会儿,来期望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去。只要超过了时间,那么Spark就会将task分配到其他任意一个空闲的executor上。

可以设置参数,spark.locality系列参数,来调节Spark等待task可以进行数据本地化的时间。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。

9.reduceByKey和groupByKey
val counts = pairs.reduceByKey(_ + _)

val counts = pairs.groupByKey().map(wordCounts => (wordCounts._1, wordCounts._2.sum))

如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减小网络传输的开销。

只有在reduceByKey处理不了时,才用groupByKey().map()来替代。
10.shuffle调优

 

    原文作者:spark
    原文地址: https://www.cnblogs.com/yin-fei/p/10778800.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞