spark面试问题小结
此为spark系列第一篇文章, 后续还会依次更新 core/ sql /DStream/mllib等比较细节的东西
scala 语言有什么特点,相比java有什么优点?
- 函数式编程,
- 适合用来处理数据
- 丰富的高级语法
- 柯里化
- 隐式转换
spark为什么使用scala开发
- 1.spark的底层使用的是Scala,所以对Scala的支持是最好的。
- 2.spark具有“轻”的特点,总共才3万多行Scala语言,该语言相对于java而言更加的简练。
- 3.对于并发性而言,尤其是处理海量数据的情况,Scala处理高并发多线程问题时,拥有巨大的优势
- 4.Scala基于java虚拟机,在开发的过程中,可以依赖任何的java库。此外,Scala除了支持函数编程,也支持面相对象编程。
MapReduce和spark的区别
- 从 high-level 的角度来看,两者并没有大的差别
- 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)
- 从 low-level 的角度来看,两者差别不小
- spark的shuffle比mr更加灵活, 有多个引擎可选
- Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职
- spark提供了更加丰富的算子
hadoop和spark对比有哪些优势??
- 数据本地性
- hadoop也有
- 基于内存计算
- hadoop每次都存到磁盘
- 随后的计算都从磁盘读取
- spark根据窄依赖将中间结果保存在内存中
- DAG执行引擎
- 可以更好的优化计算路径
- spark容错性更好
- 引入了RDD
- 可以根据血统来重建
- 并提供了Checkpoint和cache两种方式容错
- spark更加易用
- 有各种高层api
- 不仅仅是map和reduce
简单说一下hadoop和spark的shuffle相同和差异?
- 逻辑角度没有本质区别
- 从 High-Level 的角度来看,两者并没有大的差别
- 都是将 Mapper(Spark 里是 Shufflemaptask)的输出进行 Partition
- 不同的 Partition 送到不同的 Reducer(Spark 里 Reducer 可能是下一个 Stage 里的 Shufflemaptask,也可能是 Resulttask)
- mapreduce采用的是sort-based shuffle
- spark采取的是sortedbase和 merge base 两种
- mr过程
- 收集map端的产出
- 环形缓冲区 溢出写到文件 spill
- partition
- sorted
- reduce端拉取
- merge
- spark流程
- shuffle write
- 多种类型
- 根据不同算子, 灵活选择
- shuffle read
- 不像MR全部拉取才能开始,可以提前开始
spark集群原理
Spark master使用zookeeper进行HA的,有哪些元数据保存在Zookeeper?
- 所有的Worker、Driver、Application。
- 具体应用的数据是保持在Driver自己那里的
如何配置spark master的HA?
- 基于文件系统的单点恢复
- 基于zookeeper的Standby Masters
- 用于生产模式。其基本原理是通过zookeeper来选举一个Master,其他的Master处于Standby状态。
- 将Standalone集群连接到同一个ZooKeeper实例并启动多个Master,利用zookeeper提供的选举和状态保存功能,可以使一个Master被选举,而其他Master处于Standby状态
- Zookeeper中包含
- 所有的Worker、Driver、Application。
spark的有几种部署模式,每种模式特点?
- 单机
- Standalone
- yarn
spark cluster下有几种角色?各有什么功能?
- ClusterMaster
- driver
- worker
- worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给mater,master分配的时候就知道work,只有出现故障的时候才会发送资源。
- worker不会运行代码
- Executor
spark cluster中有哪些通信协议
- Driver
- DriverEndpoint
- TaskScheduler中
- server
- driver
- client
- 各个Executor
- 想Driver注册自己
- HeartBeatEndpoint
- server
- driver
- client
- 各个Executor
- ClientEndpoint
- 用来给客户端使用
- 向Master注册
- BlockManagerMasterEndpoint
- SparkContext
- 运行SC的点就是Driver
- 用于提交程序前的环境上下文
- 隐藏了spark底层的rpc细节, block管理细节等
- 序列化管理
- 任务跟踪
- 只需要给予用户api进行操作
- Worker
- WorkEndpoint
- 管理资源
- 启动Executor
- Master
- MasterEndpoint
- 管理Work
- 管理Executor
- 资源分配
- 注册Driver
- Executor
- BlockManagerSlaveEndpoint
- Block管理
- block清除和生成
- HeartBeatEndpoint
- 发送给Driver
- 被调度后执行具体的task
spark存储原理?
- 广义来讲就是BlockManager那个包里面的组件
- 功能
- Block块的存储 存储介质管理
- 对应一个partition分区
- 可在磁盘或内存之后
- 针对Spark内存机制的组件 MemoryManager
- spark的内存模型
- 类型
- 堆内内存
- 堆外内存
- StorageMemoryPool
- 对存储的物理内存的逻辑抽象
- ExecutionMemoryPool
- 用于spark RDD计算的执行
- 协议
- BlockManagerMasterEndpoint
- 运行在Driver端
- driver和Executor端有客户端
- 负责
- 注册BlockManager
- 移除RDD
- 获取Block信息
- 获取所有的操作命令到一个节点
- 并分发到各个SlaveEndpoint
- BlockManagerSlaveEndpoint
- 各个Driver和Executor都有
- 接收BMMaster的命令
- 负责
- 更新Block信息
- 获取Block位置
- 删除Executor
- 真正执行移除
spark执行引擎的原理?
- 主要包括两大块
- shuffle管理
- 执行内存的管理
spark on yarn 作业执行流程
yarn-client 和 yarn cluster 有什么区别?
- 流程
- rdd操作,触发
- runjob
- 划分stage
- 提交stage / taskSet
- Task调度
- 建立TaskSetManager
- 调度池调度
- 通过Backend
- LaunchTask
- 在Executor中启动任务
- yarn-cluster 适用于生产环境。而 yarn-client 适用于交互和调试,也就是希望快速地看到 application 的输出
如果worker节点宕机怎么办,如果在处理任务呢。
spark作业过程
- 客户端编写好之后,提交
- Driver启动
- 可以在客户端启动
- Master节点制定一个worker节点启动Driver
- Driver启动后
- Driver 节点根据 RDD 在程序中的转换和执行情况对程序进行分割scheduler
- DAGScheduler
- 按照Stage生成TaskSet
- 一个Stage
- 里面很多Task
- 惰性技术,
- Driver向ClusterMaster节点申请资源
- 在收到 Driver 进程的资源请求后, Master 节点会命令已注册的 Worker 节点启动 Executor 进程
- 结束执行
- 结果返回Client
spark core/rdd原理
rdd概述
- 设计思想
- 在并行计算阶段高效的进行数据共享
- rdd懒加载机制
rdd基本使用
- 类型
- 创建操作
- 转换操作
- 控制操作
- cache
- persist
- 行动操作
- rdd中 reduceBykey GroupByKey 有什么区别?
- reduceByKey
- reduceByKey用于对每个key对应的多个value进行merge操作
- 它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。
- Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合
- groupByKey
- groupByKey也是对每个key进行操作,但只生成一个sequence
- 当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时
- 在对大数据进行复杂计算时,reduceByKey优于groupByKey
- 实现
- val words = Array(“one”, “two”, “two”, “three”, “three”, “three”)
- val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
- val wordCountsWithReduce = wordPairsRDD.reduceByKey( + )
- val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))
- cogroup rdd实现原理
- 这个实现根据两个要进行合并的两个RDD操作,生成一个CoGroupedRDD的实例
- 这个RDD的返回结果是把相同的key中两个RDD分别进行合并操作
rdd存储原理
- 内存管理
- BlockManager
- 分区
- 分区的算法
rdd依赖原理
- 血统
- checkpoint
- cache
- cache和pesist有什么区别?
rdd调度原理
- RDD的调度过程是怎么样的那?
- 行动操作触发作业提交
- 提交后,根据RDD之间的依赖关系构建DAG
- DAG图交给DAGScheduler进行解析
- DAGScheduler
- 拆分Stage
- 从后往前
- 遇到shuffle,划分为一个新的stage
- 类型
- ResultStage
- ShuffleMapStage
- 最优化调度
- 提交给TaskScheduler
- 根据Partition划分任务集
- ResultStage->resultTask
- ShuffleMapStage-> ShuffleMapTask
- TaskScheduler
- 接收任务集
- 交给Worker中的Executor执行
- Executor
- 封装为TaskRunner
- 在Worker中以多线程形式运行
- 每个线程一个任务
- Executor执行成功后返回TaskScheduler
- 不同任务不同的返回方式
- ShufflemapTask
- 写入BlockManager
- MapStatus
- ReduceTask
- stage有哪几种?
- Shuffle Map Stage
- 和 Result Stage
- DAGScheduler如何划分stage
- 从后往前递归的划分
- createResultStage
- 入口
- getOrCreateParentStage
- 获取父stage
- 注册过的
- 利用
- 没注册过的
- 创建Stage
- TaskScheduler如何调度任务的
- task提交
- submitTasks入口
- createTaskSetManager
- 创建 stage,stageAttempt,TaskSetManager映射
- addTaskSetManager
- 去调度
- 结束task接收
- 资源分配过程
- 提交TaskSet
- 创建TaskSetManager
- backen.reviveOffers
- 给taskSet提供资源
- 通过Rpc.发送reviveoffers
- 调用TaskScheduler的reviveOffers方法
- 在调度过程中
- 根据TaskSetManager的本地性原则
- 划分task,生成taskDescribe
- 调用Executor的LaunchTask
- 运行TaskAttempt
rdd容错机制
- 自动的进行内存和磁盘的存储切换;
- rdd有哪些容错机制
- 调度层面容错
- stage失败
- 上层DAGScheduler重试
- Stage 中不引起 Shuffle 文件丢失的故障由任务调度器 TaskScheduler 处理
- Executor异常/Task失败
- 底层调度器重试
- 宽依赖短依赖计算
- RDD之间各项转换形成的计算链
- 部分丢失时可以根据Lineage计算
- Checkpoint缓存
- Lineage太长, 重算负担太大
- 避免为Lineage重新计算的冗余计算
rdd计算机制
- 分区计算
- 通过partition 组件
- 分区函数
- shuffle是什么?原理是什么?
- Spark shuffle 处于一个宽依赖,可以实现类似混洗的功能,将相同的 Key 分发至同一个 Reducer上进行处理。
- 分为shuffle读和写过程
- shuffle Writer
- 写出数据
- 内存有缓冲区
- 磁盘有文件
- 有多个不同实现
- 网络传输
- shuffle Reader
- 与BlockManager交互
- ShuffleRdd的compute函数读取上一stage的输出
- 只有一个实现
- shuffle框架的演进过程?
- Hash based
- 优势,
- 很多场景不需要排序
- 会产生太多的小文件
- 基本退出历史舞台
- sort based
- map端的任务会按照Partition id以及key对记录进行排序
- 的任务会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件
- Tungsten-Sort Based Shuffle
- 是直接使用堆外内存和新的内存管理模型,节省了内存空间和大量的gc, 是为了提升性能
- shuffle写的类型有哪些?
- BypassMergerSortshuffle
- 不需要整体有序
- 带有Hash风格的shuffle机制
- 使用条件
- 不指定ordering
- 不指定aggregator
- 分区个数小于配置
- 小文件不能太多
- UnsafeShuffleWriter
- 底部使用ShuffleExternalSorter
- 不具备聚合功能
- Tungsten内存最为缓存
- sortShuffleWriter
- 使用了ExternalSorter
- 可以对数据进行聚合和排序功能
关键流程
- spark版wordcount的执行流程是什么?
- 还是不行….
- 宏观上来讲算子执行
- hadoopRDD
- FlatMapRDD
- PariRDD
- ReduceRDD
- 每个rdd有着不同的分区
- 在不同分区上进行执行运算
- 微观上来讲
- 客户端阶段
- 实现完成Api的使用
- 在Driver中 SparkContext阶段上讲
- DAG-> Task
- 划分为1个Stage
- 中间不需要shuffle
- 向ClusterMananger申请应用
- 请求资源
- 执行开始后
- 每个task单独运行
- 最后进行一次shuffle操作
- spark中作业执行的流程是怎么样的?
- rdd操作,触发
- runjob
- 划分stage
- 提交stage / taskSet
- Task调度
- 建立TaskSetManager
- 调度池调度
- 通过Backend
- LaunchTask
- 在Executor中启动任务
spark Streaming
Spark Streaming的工作原理
- 在Spark core的基础上, 添加一层抽象
- 将流式计算分解为一系列短小的批处理作业
- 将DStream数据转换为一段段的RDD
- 把对DStream的操作转换为对RDD的操作
- 将RDD计算结果算出来
- SparkSession中
- DStreamGraph
- 用来管理DStream直接的依赖
- 后对应着RDD之间的依赖
- DStream
- 时空维度的RDD集合
- 表示连续数据流
- 内部一个HashMap
- 高 度抽象
- 对RDD的一层封装
D-Stream的原理
- 时空维度的RDD集合
- 表示连续数据流
- 内部一个HashMap
- 高 度抽象
- 对RDD的一层封装
- 核心组件
- DStreamGraph
- JobScheduler
- JobGenerator
- Receiver Tracker
- generateJob
- 内部方法,实际生成Job,
- 定期生成新的RDD
- 调用Compute
- 被DStreamGraph调用,
- compute
- 内部的Compute函数,
- 每个批处理间隔需要调动生成数据的方法
- 所有核心操作通过JobScheduler转化到RDD层操作
Spark-Streaming 和Kafka连接的两种方式
- 基于receiver的方法
- Receiver是使用Kafka的高层次Consumer API来实现的
- 在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)
- 偏移量管理交给kafka, 老版本存在zk中
- 原理
- receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
- Receiver的方式则不能保证只被处理一次,因为Receiver和ZK中的数据可能不同步,spark Streaming可能会重复消费数据
- 源码
- KafkaInputDStream
- 实现的是DStream的ReceiverInputDStream
- 依赖给出的Receiver类,生成数据
- Compute函数中调用
- 与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据
- Receiver方式就是从Kafka中拉取数据,每次接受固定时间间隔的数据存储到内存中
- 注意事项
- Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
- 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
- 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
- 基于Direct的方式
- 不适用receiver
- 原理
- Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。
- 周期性查询kafka,获取topic和partition的offset, 存储在checkpoint中
- 优点
- 简单并且并行
- kafkaPartition和RDD partition会对应
- 性能更好
- 不需要spark开启wal
- 一次仅且一次的事务保证
Spark Streaming中的batch如何实现的
- 依赖DStream
spark Streaming的checkpoint与RDD checkpoint有何不同?
- RDD的checkpoint单纯
- 第一, 数据的持久化备份
- 第二, 改变RDD直接的血统依赖
- ss的,更加丰富
- 第一层面
- 元数据的checkpoint
- 配置信息
- DStream操作信息
- 未处理的batch信息
- 第二层面
- 有状态的计算的checkpoint
- 聚合操作的结果
- 有状态的操作结果
- 要避免依赖链条变成,而导致的失败恢复时间过长
- 应该定期checkpoint
- 可以设置自动的checkpoint
- 时间为window的5-10个
spark streming在实时处理时会发生什么故障,如何停止,解决?
- 有两个层面的容错机制
- DStream层面
- 元数据信息
- state信息
- RDD层面
如果Spark Streaming停掉了,如何保证Kafka的重新运作是合理的呢
- 首先要说一下 Spark 的快速故障恢复机制
- DStream的作用是生成RDD
- 可以利用RDD层面的容错机制
- 在 Spark 中,计算将会分成许多小的任务,保证能在任何节点运行后能够正确合并
- 因此,就算某个节点出现故障,这个节点的任务将均匀地分散到集群中的节点进行计算,相对于传递故障恢复机制能够更快地恢复。
job提交流程
- 创建Streaming Context
- 初始化Graph
- 各种组件
- 输入InputDStream
- 创建
- InputDStream
- 一般还会定义Receiver,在Executor上接收数据
- 缓存为Block,进行计算
- 设置好数据源
- 添加到Graph里面的输入源中
- 固定时间间隔使用Block内容,启动Job
- 使用JobGenerator
- 转换和输出操作
- 输出操作的时候, 把自己添加到DStreamGraph中
- 给每个DStream设置好Graph引用
- 添加到Graph的输出源中
- start方法
- 启动JobScheduler
- 管理Job
- 生成Job
- 一个job的执行过程
- 定时器触发后续一系列操作
- 每个时间间隔自动生成作业
- 定期调用JobGenerate方法
- 内部使用JobGenerator生成job
- 每个时间间隔自动生成作业
- 定期调用JobGenerate方法
- 调用DStreamGraph的jobGenerate方法
- 挨个调用OutputDStream的JobGenerate
- getOrCompute
- 生成RDD
- 从ForeachDStream开始
- 可能经过ShuffleDStream和MappedDStream
- 到InputDStream
- context.sparkContext.runJob
spark sql
什么是钨丝计划?
有哪些优化?
- 统一 内存管理模型和 二进制处理( Bi nary Processing )
- 替换基于 JVM的静态内存管理, 引入 Page 来管理堆内存和堆外内存( on-heap 和 off-heap )
- 直接操作内存中的二进制数据,而不是 Java 对象
- 代码生成( Code Generation )
- 内存列存储
- 面向列的列存储
- 基于缓存感知的计算
- Spark 内存读取操作也会带来一部分性能损耗,鸽丝计划便设计了缓存友好的算法和数据结构来提高缓存命中 率
- 充分利用Ll /L2/L3 三级缓存,大幅提高了内存读取速度,进而缩短了内存中整个计算过程的时间
rdd dc ds性能问题
- ds>df>rdd 性能?
- 为什么?
- RDD优缺点
- 编译时类型安全
- 面向对象的编程风格
- 序列化和反序列化的开销很大
- 通信和IO操作都需要额外的序列化,反序列化
- GC性能开销
- DataFrame
- 本质还是不可变的分布式弹性数据集
- 带有 schema 元信息
- 类似于传统数据库中的二维表格。
- 包含了以ROW为单位的每行数据的列的信息
- 即 DataFrame 表示的二维表数据集的每一列都带有名称和类型
- Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存
- 优缺点
- 对比RDD提升计算效率、减少数据读取、底层计算优化
- off-heap优化
- Tungsten优化
- DataFrame不是类型安全的, API也不是面向对象风格的
- DataFrame 不再是一个独立 的类
- 作为DataSet [Row]的别名定义在 org.apache . spark.sql 这个包对象
- DataFrame 实际上就是 DataSet[Row]
- Dataset
- DataSet以Catalyst逻辑执行计划表示
- 提供了编译时 的类型安全信息
- 不同于Row是一个泛化的无类型JVM object
- Dataset是由一系列的强类型JVM object组成的
- 引入核心的Encoder
- 序列化了,不需要额外的序列化开销
- 幵且数据以编码的二进制形式被存储
- 不需要反序列化就可以执行sorting、
- shuffle等操作
- 有着弱类型,强类型两种
- Dataset[Row]
- 别名DataFrame
- Dataset[T]
- 特点
- Transformation 级别的算子作用于 DataSet 会得到一个新的 DataSet
- action 算子被调用时, Spark 的查询优化器会优化 Transformation 算子形成的逻辑计划 ,并生成一个物理计划,该物理计划可以通过并行和分布式的方式来执行。
- Dataset 是 Lazy 级别的,
- 优缺点
- 提供类型安全和面向对象的编程接口
- 引入Encoder
- 不仅能够编译器完成类型安全检查
- 还能够生成字节码,
- 与对外数据进行交互
- 性能好的原因
- 内存列存储
- 并不是jvm对象存储方式
- jvm优化
- codegen模块
- 字节码生成技术
- 对匹配的表达式采用特定的代码动态编译
- sql表达式gc优化
- scala优化
- sql避免了使用scala时,低效的的容易gc的代码
SparkContext和SparkSession有什么区别
- Spark入口类,SparkContext相当于连接Spark集群的纽带,并用来在集群中创建RDD、accumulators、broadCast变量.一个JVM只允许有一个SparkContext.
- SparkSession是Spark程序以及用来创建DataSet和DataFrame 的入口类
spark sql执行流程
- 逻辑计划
- SqlParser 解析SQL语句
- 生成逻辑计划
- 通过sparkSqlParser
- 未解析的逻辑算子树
- 过程
- 由AstBuilder执行节点访问, 将语法树中各种Context转化为对应的LogisticPlan节点
- 此时是未解析的逻辑算子树
- 不包括数据信息和列信息
- 使用Catalyst分析器
- 生成Analyzed逻辑计划
- 结合数据字典绑定
- 提取schema信息
- 解析后的逻辑算子树
- 由Analyzer将一系列规则作用于Unresolved LogisticPlan上
- 使用Catalyst优化器
- 优化Analyzed逻辑计划
- Optimized逻辑计划
- 优化后的逻辑算子树
- 由优化器Optimizer优化
- 物理计划
- 生成物理算子树列表
- 与spark Planner交互
- 使用策略生成 优化后的物理计划
- 将逻辑计划生成物理计划
- 提交前的准备工作
- 转换为RDD计划
spark mllib
ml的思想
- pipeline机制提供数据的装换
- 实现了transformer接口
- 训练好的模型也是tranformer
- 未训练好的模型为一个predictor
- 提供fit
- train方法训练
如何训练
- 核心思想
- 以lr为例子
- 实现的是数据样本的划分, 简单模型和大量数据的模式
- 单机训练后, 通过组件来聚合结果, 然后开始下一轮
- breeze
- 提供机器学习单机训练的算法
- SGD
- LBFGS
- Aggregator
- 对各个partition的训练结果进行聚合
- treeAggregate算子
- loss
- 通过聚合的结果计算loss
- 在这里添加正则化的支持
spark优化问题
Spark的shuffle是什么样的,怎么优化的
- 应用程序层面的调优
- 降低单条记录的资源开销,处理数据倾斜,复用RDD进行缓存,作业并行化执行等等
JVM层面的调优
- 设置合适的资源量,设置合理的JVM,启用高效的序列化方法如kyro,增大off head内存等等
spark数据倾斜问题
- 前提是定位数据倾斜
- 解决方法,有多个方面
- 改变并行度,可能并行度太少了,导致个别task数据压力大
- 自定义paritioner,分散key的分布,使其更加均匀
- 避免不必要的shuffle,如使用广播小表的方式,将reduce-side-join提升为map-side-join
- 分拆发生数据倾斜的记录,分成几个部分进行,然后合并join后的结果
- 两阶段聚合,先局部聚合,再全局聚合