spark 基础一:基本工作原理与RDD

Spark基本工作原理与RDD

主要特点

分布式;
主要是基于内存(少数情况基于磁盘);
迭代式计算:可以通过与MR对比来说明spark的迭代式计算,MR分为两个阶段,map和reduce,两个阶段完了我们,job就结束了,所以我们在一个job里能做的处理很有限,只能是在map和reduce里处理;

但是,spark计算模型,可以分为n个阶段,因为它是内存迭代式的,我们在处理完一个阶段以后,可以继续往下处理很多个阶段,而不只是两个阶段,所以,spark相较于MR,计算模型可以提供更强大的功能,如下图:

《spark 基础一:基本工作原理与RDD》 image.png

RDD

RDD是spark中最基础,也是最重要的的一个概念,简单总结如下
1、RDD是spark提供的核心 抽象 ,全称为Resillient Distributed Dataset,即弹性分布式数据集。
2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的。分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作;(分布式)
3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建;
4、RDD最重要的特性就是,提供了 容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的
5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘,对用户是透明的。(弹性)

spark运行模式

本地模式:就是在本地直接运行(利用多线程来模拟集群多节点),通常是用来验证代码基本逻辑功能

集群模式 Spark Standalone:独立模式,类似MapReduce 1.0所采取的模式,完全由内部实现容错性和资源管理

集群模式 Spark on Yarn:让Spark运行在一个通用的资源管理系统之上,这样可以与其他计算框架共享资源

简单总结

spark的核心编程是什么?其实,就是:
1、定义初始的RDD,就是说,你要定义第一个RDD从哪里读取数据,hdfs、hive、linux本地文件、程序中的集合
2、定义对RDD的计算操作,这个在spark里称之为算子,比如,map、reduce、flatMap、groupByKey,比MR提供的map和reduce强大的多了
3、其实就是循环往复的过程,第一个计算完了以后,数据可能就会到了新的一批节点上,也就是变成一个新的RDD,然后再次反复,针对新的RDD定义计算操作。。。
4、最后,获得最终的数据,将数据保存起来
下面使用wordcount程序来说明这个过程

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * @author Administrator
 */
object WordCount {
  
  def main(args: Array[String]) {
    
    //第一步,创建SparkConf对象,设置spark应用的配置信息
    //使用setMaster()可以设置Spark应用程序要连接的spark集群的master节点的url,但是如果设置为local,则代表,在本地运行
    val conf = new SparkConf()
        .setAppName("WordCount");
        .setMaster("local");  
    // 第二步,创建JavaSparkContext对象,在spark中,sparkContent是spark所有功能的一个入口,无论是用Java,Scala,Python,都必须要有一个sparkContent,作用是,初始化spark应用程序的一些核心组件,包括调度器(DAGSchedule/TaskScheduler),还会去spark master节点上进行注册,
    val sc = new SparkContext(conf)
   //第三步,针对输入源(hdfs文件,本地文件,hive等等),创建一个初始RDD,输入源中的数据会打散,分配到RDD的每个Partition中,从而形成一个初始的分布式的数据集;sparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile()方法;在这里呢,RDD中,有元素这种概念,如果是hdfs或者本地文件,创建RDD,每一个元素就相当于是文件里的一行
    val lines = sc.textFile("hdfs://Hadoop1:9000/spark.txt", 1);
    //第四步,对初始RDD进行transformation操作,通常操作会通过创建function,并配合RDD的map、flatMap等算子来执行
    //flapMap:将RDD的一个元素,给拆成一个或多个元素
    val words = lines.flatMap { line => line.split(" ") }   
    //接着,需要将每一个单词,映射为(单词, 1)的这种格式
    val pairs = words.map { word => (word, 1) }   
    //接着,需要以单词作为key,统计每个单词出现的次数
    val wordCounts = pairs.reduceByKey { _ + _ }
    //最后action操作,一个spark应用中,光是有transformation操作,是不行的,是不会操作的,必须要有一种叫做action操作的,比如,foreach,来触发程序的执行
    wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))  
  }
  
}

《spark 基础一:基本工作原理与RDD》 image.png

Spark主要组件:
1、Driver:是个进程,spark集群的节点之一,我们编写的spark程序就在driver上,有driver进程执行
2、Master:是个进程,主要负责资源的调度和分配,还有集群的监控
3、Worker:是个进程,职责一,是用自己内存,存储RDD的某个或某些partition;职责二,启动其他进程和线程,对RDD上的partiton进行并行的处理和计算
4、Executor:进程,用来执行task
5、Task:线程,对RDD的partition数据执行指定的算子操作,形成新的RDD的partition。

一个spark应用在spark集群上的执行步骤:
1:我们向spark集群提交一个spark应用,Driver进程启动,然后做一些初始化操作,在这个操作过程中,就会发送请求到Master上,进行spark应用程序的注册,让Master知道,有一个新的spark应用程序要运行
2:Master,在接收到Spark应用程序的注册申请后,发送请求给Worker,进行资源的调度和分配,简单点说,就是分配executor
3:Worker,接收到Master的请求之后,会为spark应用启动Executor
4:Executor启动之后,会向Driver进行反注册,这样,Driver就知道,哪些Executor是为它进行服务的了,Driver注册了一些Executor之后,就可以开始正式执行我们的spark应用程序了,首先,第一步就是创建初始RDD,读取数据源
5:Driver会根据我们对RDD定义的操作,提交一大堆task去executor上
6:Executor接收到task之后,会启动多个线程来执行task
7:task就会对RDD的partiton数据执行指定的算子操作,形成新的RDD的partition

《spark 基础一:基本工作原理与RDD》 image.png

    原文作者:张凯_9908
    原文地址: https://www.jianshu.com/p/3aaeecbfce82
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞