初识Apache Spark

第一次接触Spark,自己整理了(从网络,书籍,同事那里)一些Spark的相关内容当做笔记。路过的朋友仅供参考,不能保证说得都对。

什么是 Spark

简单来说,Spark是一种面向对象、函数式编程语言。Spark能够像操作本地集合对象一样轻松地操作分布式数据集。它具有运行速度快、易用性好、通用性强和随处运行等特点。

Spark提供了支持Java、scala、Python以及R语言的API。还支持更高级的工具如:Spark Sql、Spark Streaming、MLlib、GraphX等。

官方介绍 :Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

百度百科:Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎

Spark 有什么特点

以下摘自百度百科

更快的速度。内存中计算, 比 Hadoop 快100倍。
易用性。Spark 提供了80多个高级运算符。
通用性。Spark 提供了大量的库,包括SQL、DataFrames、MLlib、GraphX、Spark Streaming。 开发者可以在同一个应用程序中无缝组合使用这些库。
支持多种资源管理器。Spark 支持 Hadoop YARN,Apache Mesos,及其自带的独立集群管理器

更多详细介绍可参考:https://blog.csdn.net/xwc35047/article/details/51072145

什么是 RDD

在我使用Spark的过程中,用到最多的对象就是RDD,比如JavaRDD、JavaPairRDD。然后RDD之间又可以互相转化。这个RDD是个啥?

RDD全文是 Resilient Distributed DataSet(弹性·分布式·数据集)

RDD是一个只读的、可分区的、支持多种来源 、有容错机制 、可以被缓存 、支持并行操作的分布式数据集,可以装载任何你想装载的数据。他的弹性特点体现在RDD的数据可以在内存与磁盘(外存)灵活交换。

Spark 模型

再来认识一下下面几个重要概念

Application。也就是我们编写完Spark程序,负责生成SparkContext。

Job。所谓 job,就是由一个 rdd 的 action算子(后面再说action) 触发的动作,可以简单的理解为,当你需要执行一个 rdd 的 action 的时候,会生成一个 job。

Stage。stage 是一个 job 的组成单位,就是说,一个 job 会被切分成 1 个或多个 stage,然后各个 stage 会按照执行顺序依次执行。

Task。stage 下的一个任务执行单元,一般来说,一个 rdd 有多少个 partition(分区,后面再说partition),就会有多少个 task,因为每一个 task 只是处理一个 partition 上的数据。

简单来说就是以RDD为基准,每触发一个action操作,就会生成一个job。job内部有一个或多个stage顺序执行,组成stage的是一系列task,即任务执行单元。

关于Partition分区。Spark RDD主要由Dependency、Partition、Partitioner组成,Partition是其中之一。一份待处理的原始数据会被按照相应的逻辑切分成n份,每份数据对应到RDD中的一个Partition,Partition的数量决定了task的数量,影响着程序的并行度。

关于Stage。Stage以shuffle和result这两种类型来划分。Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。

那么刚刚提到的action又是什么?我们再来了解一下RDD操作算子。

什么是 RDD 操作算子

RDD有两种操作算子:Transformation(转换) 和 Action(执行)

Transformation。即一个rdd数据集经过数据转换变成一个新的rdd数据集。常用的Transformation操作有:map、filter、union、distinct、groupByKey 等。Transformation 属于延迟计算,当触发Transformation算子时rdd并没有立即进行转换,仅仅是记住了数据集的逻辑操作。

Action。触发Spark作业的运行,真正触发转换算子的计算。常用的操作有:reduce、collect、count、countByKey等等。

什么是 Shuffle

以下摘自官网

Shuffle 即洗牌。以reduceByKey 操作来说,reduceByKey操作生成一个新的RDD,其中相同key的所有值都组合为一个元组——key和reduce函数的结果。但是,并非所有相同key的值都必须位于同一个分区甚至是同一台计算机上,然而它们必须位于同一位置才能计算结果。

在Spark中,数据通常不会根据特定的操作在必要位置进行跨分区分布。在计算过程中,单个任务在单个分区上运行。因此,要执行reduceByKey任务的所有数据,Spark需要执行全部操作。它必须从所有分区中读取数据以找到单个key的所有值,然后将各分区中的值汇总以计算每个key的最终结果 – 这称为洗牌。

什么是窄依赖、宽依赖

窄依赖。指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD 的分区。
宽依赖。指子RDD的分区依赖于父RDD的所有分区。

举例

下面通过一个例子,尽量把我所理解的那部分通过这个小例子表达出来,不保证说的都对

统计某高中今年参加高考的男生人数

首先我们需要将数据源读取到Spark RDD中(先不管如何读取),一个数据源只生成一个rdd

rdd内部会按照一定的逻辑分割成n个partition分区,分区数也可以自己指定,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目,每个分区由一个task 执行。

rdd先执行filter 操作即Transformation算子。将参加高考的,性别为男性的对象过滤出来。但Transformation 是惰性的,不会立刻触发spark 作业。

过滤后的rdd 需要进行reduce操作即Action算子。此时触发spark 作业。每个action将生成一个Job。

Job 包含stage,stage有两种:shuffle和result,取决于算子的执行逻辑。如果一个job中有宽依赖,即有shuffle操作,shuffle之前的生成一个shuffle stage。shuffle之后的生成一个result stage。

每个stage 都是一组task 在执行,task 取决于分区数。

reduce 过程将符合条件的学生数计数并返回。

代码示例

package com.yzy.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SparkDemo {
    private static String appName = "spark.demo";
    private static String master = "local[*]";

    public static void main(String[] args) {
        JavaSparkContext sc = null;
        try {
            //初始化JavaSparkContext
            SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
            sc = new JavaSparkContext(conf);

            // 生成数据源
            List<Student> data = getList();

            //生成rdd
            JavaRDD<Student> rdd = sc.parallelize(data);

            //过滤符合条件的数据
            rdd = rdd.filter(new Function<Student, Boolean>() {
                public Boolean call(Student s) throws Exception {
                    return s.isGaoKao() && s.getSex().equals("男");
                }
            });

            // map && reduce
            Student result = rdd.map(new Function<Student, Student>() {
                public Student call(Student s) throws Exception {
                    s.setCount(1);
                    return s;
                }
            }).reduce(new Function2<Student, Student, Student>() {
                public Student call(Student s1, Student s2) throws Exception {
                    s1.setCount(s1.getCount() + s2.getCount());
                    return s1;
                }
            });

            System.out.println("执行结果:" + result.getCount());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (sc != null) {
                sc.close();
            }
        }
    }

    public static List<Student> getList(){
        List<Student> data = new ArrayList<Student>();
        data.add(new Student(true,"男", "A"));
        data.add(new Student(false,"女", "B"));
        data.add(new Student(false,"男", "C"));
        data.add(new Student(true,"女", "D"));
        data.add(new Student(true,"男", "E"));
        data.add(new Student(false,"女", "F"));
        data.add(new Student(true,"男", "G"));
        return data;
    }

    static class Student implements Serializable{
        private String name;
        private boolean gaoKao;
        private String sex;
        private int count;

        public Student(boolean gaoKao, String sex, String name) {
            this.gaoKao = gaoKao;
            this.sex = sex;
            this.name = name;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public boolean isGaoKao() {
            return gaoKao;
        }

        public void setGaoKao(boolean gaoKao) {
            this.gaoKao = gaoKao;
        }

        public String getSex() {
            return sex;
        }

        public void setSex(String sex) {
            this.sex = sex;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }
    }
}

控制台输出

//省略若干行
18/06/22 18:42:25 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 41 ms on localhost (executor driver) (1/4)
18/06/22 18:42:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 58 ms on localhost (executor driver) (2/4)
18/06/22 18:42:25 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 44 ms on localhost (executor driver) (3/4)
18/06/22 18:42:25 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 44 ms on localhost (executor driver) (4/4)
18/06/22 18:42:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/06/22 18:42:25 INFO DAGScheduler: ResultStage 0 (reduce at SparkDemo.java:44) finished in 0.219 s
执行结果:3

请注意:实例中的Student 类必须序列化,否则会报错!

    原文作者:憨人Zoe
    原文地址: https://www.jianshu.com/p/59b0601d7ad2
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞