spark部分概述 - 校招准备

spark面试问题小结

此为spark系列第一篇文章, 后续还会依次更新 core/ sql /DStream/mllib等比较细节的东西

scala 语言有什么特点,相比java有什么优点?

  • 函数式编程,
    • 适合用来处理数据
  • 丰富的高级语法
    • 柯里化
    • 隐式转换

spark为什么使用scala开发

  • 1.spark的底层使用的是Scala,所以对Scala的支持是最好的。
  • 2.spark具有“轻”的特点,总共才3万多行Scala语言,该语言相对于java而言更加的简练。
  • 3.对于并发性而言,尤其是处理海量数据的情况,Scala处理高并发多线程问题时,拥有巨大的优势
  • 4.Scala基于java虚拟机,在开发的过程中,可以依赖任何的java库。此外,Scala除了支持函数编程,也支持面相对象编程。

MapReduce和spark的区别

  • 从 high-level 的角度来看,两者并没有大的差别
    • 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)
  • 从 low-level 的角度来看,两者差别不小
    • spark的shuffle比mr更加灵活, 有多个引擎可选
    • Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职
    • spark提供了更加丰富的算子

hadoop和spark对比有哪些优势??

  • 数据本地性
    • hadoop也有
  • 基于内存计算
    • hadoop每次都存到磁盘
    • 随后的计算都从磁盘读取
    • spark根据窄依赖将中间结果保存在内存中
  • DAG执行引擎
    • 可以更好的优化计算路径
  • spark容错性更好
    • 引入了RDD
    • 可以根据血统来重建
    • 并提供了Checkpoint和cache两种方式容错
  • spark更加易用
    • 有各种高层api
    • 不仅仅是map和reduce

简单说一下hadoop和spark的shuffle相同和差异?

  • 逻辑角度没有本质区别
    • 从 High-Level 的角度来看,两者并没有大的差别
    • 都是将 Mapper(Spark 里是 Shufflemaptask)的输出进行 Partition
    • 不同的 Partition 送到不同的 Reducer(Spark 里 Reducer 可能是下一个 Stage 里的 Shufflemaptask,也可能是 Resulttask)
  • mapreduce采用的是sort-based shuffle
  • spark采取的是sortedbase和 merge base 两种
  • mr过程
    • 收集map端的产出
    • 环形缓冲区 溢出写到文件 spill
      • partition
      • sorted
    • reduce端拉取
      • merge
  • spark流程
    • shuffle write
      • 多种类型
      • 根据不同算子, 灵活选择
    • shuffle read
      • 不像MR全部拉取才能开始,可以提前开始

spark集群原理

Spark master使用zookeeper进行HA的,有哪些元数据保存在Zookeeper?

  • 所有的Worker、Driver、Application。
  • 具体应用的数据是保持在Driver自己那里的

如何配置spark master的HA?

  • 基于文件系统的单点恢复
  • 基于zookeeper的Standby Masters
    • 用于生产模式。其基本原理是通过zookeeper来选举一个Master,其他的Master处于Standby状态。
    • 将Standalone集群连接到同一个ZooKeeper实例并启动多个Master,利用zookeeper提供的选举和状态保存功能,可以使一个Master被选举,而其他Master处于Standby状态
    • Zookeeper中包含
      • 所有的Worker、Driver、Application。

spark的有几种部署模式,每种模式特点?

  • 单机
  • Standalone
  • yarn

spark cluster下有几种角色?各有什么功能?

  • ClusterMaster
  • driver
  • worker
    • worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给mater,master分配的时候就知道work,只有出现故障的时候才会发送资源。
    • worker不会运行代码
  • Executor

spark cluster中有哪些通信协议

  • Driver
    • DriverEndpoint
      • TaskScheduler中
      • server
        • driver
      • client
        • 各个Executor
        • 想Driver注册自己
    • HeartBeatEndpoint
      • server
        • driver
      • client
        • 各个Executor
    • ClientEndpoint
      • 用来给客户端使用
      • 向Master注册
    • BlockManagerMasterEndpoint
    • SparkContext
      • 运行SC的点就是Driver
      • 用于提交程序前的环境上下文
      • 隐藏了spark底层的rpc细节, block管理细节等
      • 序列化管理
      • 任务跟踪
      • 只需要给予用户api进行操作
  • Worker
    • WorkEndpoint
    • 管理资源
    • 启动Executor
  • Master
    • MasterEndpoint
    • 管理Work
    • 管理Executor
    • 资源分配
    • 注册Driver
  • Executor
    • BlockManagerSlaveEndpoint
      • Block管理
      • block清除和生成
    • HeartBeatEndpoint
      • 发送给Driver
    • 被调度后执行具体的task

spark存储原理?

  • 广义来讲就是BlockManager那个包里面的组件
  • 功能
    • Block块的存储 存储介质管理
      • 对应一个partition分区
      • 可在磁盘或内存之后
    • 针对Spark内存机制的组件 MemoryManager
      • spark的内存模型
      • 类型
        • 堆内内存
        • 堆外内存
      • StorageMemoryPool
        • 对存储的物理内存的逻辑抽象
      • ExecutionMemoryPool
        • 用于spark RDD计算的执行
  • 协议
    • BlockManagerMasterEndpoint
      • 运行在Driver端
      • driver和Executor端有客户端
      • 负责
        • 注册BlockManager
        • 移除RDD
        • 获取Block信息
      • 获取所有的操作命令到一个节点
      • 并分发到各个SlaveEndpoint
    • BlockManagerSlaveEndpoint
      • 各个Driver和Executor都有
      • 接收BMMaster的命令
      • 负责
        • 更新Block信息
        • 获取Block位置
        • 删除Executor
      • 真正执行移除

spark执行引擎的原理?

  • 主要包括两大块
    • shuffle管理
    • 执行内存的管理

spark on yarn 作业执行流程

yarn-client 和 yarn cluster 有什么区别?

  • 流程
    • rdd操作,触发
    • runjob
    • 划分stage
    • 提交stage / taskSet
    • Task调度
    • 建立TaskSetManager
    • 调度池调度
    • 通过Backend
      • LaunchTask
    • 在Executor中启动任务
  • yarn-cluster 适用于生产环境。而 yarn-client 适用于交互和调试,也就是希望快速地看到 application 的输出

如果worker节点宕机怎么办,如果在处理任务呢。

spark作业过程

  • 客户端编写好之后,提交
  • Driver启动
    • 可以在客户端启动
    • Master节点制定一个worker节点启动Driver
  • Driver启动后
    • Driver 节点根据 RDD 在程序中的转换和执行情况对程序进行分割scheduler
      • DAGScheduler
    • 按照Stage生成TaskSet
      • 一个Stage
      • 里面很多Task
      • 惰性技术,
    • Driver向ClusterMaster节点申请资源
    • 在收到 Driver 进程的资源请求后, Master 节点会命令已注册的 Worker 节点启动 Executor 进程
  • 结束执行
    • 结果返回Client

spark core/rdd原理

rdd概述

  • 设计思想
    • 在并行计算阶段高效的进行数据共享
    • rdd懒加载机制

rdd基本使用

  • 类型
    • 创建操作
    • 转换操作
    • 控制操作
      • cache
      • persist
    • 行动操作
  • rdd中 reduceBykey GroupByKey 有什么区别?
    • reduceByKey
      • reduceByKey用于对每个key对应的多个value进行merge操作
      • 它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。
      • Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合
    • groupByKey
      • groupByKey也是对每个key进行操作,但只生成一个sequence
      • 当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时
    • 在对大数据进行复杂计算时,reduceByKey优于groupByKey
    • 实现
      • val words = Array(“one”, “two”, “two”, “three”, “three”, “three”)
      • val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
      • val wordCountsWithReduce = wordPairsRDD.reduceByKey( + )
      • val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))
  • cogroup rdd实现原理
    • 这个实现根据两个要进行合并的两个RDD操作,生成一个CoGroupedRDD的实例
    • 这个RDD的返回结果是把相同的key中两个RDD分别进行合并操作

rdd存储原理

  • 内存管理
  • BlockManager
  • 分区
    • 分区的算法

rdd依赖原理

  • 血统
  • checkpoint
  • cache
  • cache和pesist有什么区别?

rdd调度原理

  • RDD的调度过程是怎么样的那?
    • 行动操作触发作业提交
      • 提交后,根据RDD之间的依赖关系构建DAG
      • DAG图交给DAGScheduler进行解析
    • DAGScheduler
      • 拆分Stage
        • 从后往前
        • 遇到shuffle,划分为一个新的stage
      • 类型
        • ResultStage
        • ShuffleMapStage
      • 最优化调度
      • 提交给TaskScheduler
        • 根据Partition划分任务集
        • ResultStage->resultTask
        • ShuffleMapStage-> ShuffleMapTask
    • TaskScheduler
      • 接收任务集
      • 交给Worker中的Executor执行
    • Executor
      • 封装为TaskRunner
      • 在Worker中以多线程形式运行
      • 每个线程一个任务
      • Executor执行成功后返回TaskScheduler
      • 不同任务不同的返回方式
        • ShufflemapTask
          • 写入BlockManager
          • MapStatus
        • ReduceTask
  • stage有哪几种?
    • Shuffle Map Stage
    • 和 Result Stage
  • DAGScheduler如何划分stage
    • 从后往前递归的划分
    • createResultStage
      • 入口
    • getOrCreateParentStage
      • 获取父stage
      • 注册过的
        • 利用
      • 没注册过的
        • 创建Stage
  • TaskScheduler如何调度任务的
    • task提交
      • submitTasks入口
      • createTaskSetManager
        • 创建 stage,stageAttempt,TaskSetManager映射
      • addTaskSetManager
        • 去调度
      • 结束task接收
    • 资源分配过程
      • 提交TaskSet
      • 创建TaskSetManager
      • backen.reviveOffers
        • 给taskSet提供资源
        • 通过Rpc.发送reviveoffers
        • 调用TaskScheduler的reviveOffers方法
      • 在调度过程中
        • 根据TaskSetManager的本地性原则
        • 划分task,生成taskDescribe
      • 调用Executor的LaunchTask
        • 运行TaskAttempt

rdd容错机制

  • 自动的进行内存和磁盘的存储切换;
  • rdd有哪些容错机制
    • 调度层面容错
      • stage失败
        • 上层DAGScheduler重试
        • Stage 中不引起 Shuffle 文件丢失的故障由任务调度器 TaskScheduler 处理
      • Executor异常/Task失败
        • 底层调度器重试
    • 宽依赖短依赖计算
      • RDD之间各项转换形成的计算链
      • 部分丢失时可以根据Lineage计算
    • Checkpoint缓存
      • Lineage太长, 重算负担太大
      • 避免为Lineage重新计算的冗余计算

rdd计算机制

  • 分区计算
    • 通过partition 组件
    • 分区函数
  • shuffle是什么?原理是什么?
    • Spark shuffle 处于一个宽依赖,可以实现类似混洗的功能,将相同的 Key 分发至同一个 Reducer上进行处理。
    • 分为shuffle读和写过程
      • shuffle Writer
        • 写出数据
          • 内存有缓冲区
          • 磁盘有文件
        • 有多个不同实现
      • 网络传输
      • shuffle Reader
        • 与BlockManager交互
        • ShuffleRdd的compute函数读取上一stage的输出
        • 只有一个实现
  • shuffle框架的演进过程?
    • Hash based
      • 优势,
        • 很多场景不需要排序
      • 会产生太多的小文件
      • 基本退出历史舞台
    • sort based
      • map端的任务会按照Partition id以及key对记录进行排序
      • 的任务会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件
    • Tungsten-Sort Based Shuffle
      • 是直接使用堆外内存和新的内存管理模型,节省了内存空间和大量的gc, 是为了提升性能
  • shuffle写的类型有哪些?
    • BypassMergerSortshuffle
      • 不需要整体有序
      • 带有Hash风格的shuffle机制
      • 使用条件
        • 不指定ordering
        • 不指定aggregator
        • 分区个数小于配置
          • 小文件不能太多
    • UnsafeShuffleWriter
      • 底部使用ShuffleExternalSorter
      • 不具备聚合功能
      • Tungsten内存最为缓存
    • sortShuffleWriter
      • 使用了ExternalSorter
      • 可以对数据进行聚合和排序功能

关键流程

  • spark版wordcount的执行流程是什么?
    • 还是不行….
    • 宏观上来讲算子执行
      • hadoopRDD
      • FlatMapRDD
      • PariRDD
      • ReduceRDD
      • 每个rdd有着不同的分区
      • 在不同分区上进行执行运算
    • 微观上来讲
      • 客户端阶段
        • 实现完成Api的使用
      • 在Driver中 SparkContext阶段上讲
      • DAG-> Task
        • 划分为1个Stage
        • 中间不需要shuffle
      • 向ClusterMananger申请应用
        • 请求资源
      • 执行开始后
        • 每个task单独运行
        • 最后进行一次shuffle操作
  • spark中作业执行的流程是怎么样的?
    • rdd操作,触发
    • runjob
    • 划分stage
    • 提交stage / taskSet
    • Task调度
    • 建立TaskSetManager
    • 调度池调度
    • 通过Backend
      • LaunchTask
    • 在Executor中启动任务

spark Streaming

Spark Streaming的工作原理

  • 在Spark core的基础上, 添加一层抽象
    • 将流式计算分解为一系列短小的批处理作业
    • 将DStream数据转换为一段段的RDD
    • 把对DStream的操作转换为对RDD的操作
    • 将RDD计算结果算出来
  • SparkSession中
    • DStreamGraph
    • 用来管理DStream直接的依赖
    • 后对应着RDD之间的依赖
  • DStream
    • 时空维度的RDD集合
    • 表示连续数据流
      • 内部一个HashMap
      • 高 度抽象
      • 对RDD的一层封装

D-Stream的原理

  • 时空维度的RDD集合
  • 表示连续数据流
    • 内部一个HashMap
    • 高 度抽象
    • 对RDD的一层封装
  • 核心组件
    • DStreamGraph
    • JobScheduler
      • JobGenerator
      • Receiver Tracker
  • generateJob
    • 内部方法,实际生成Job,
    • 定期生成新的RDD
    • 调用Compute
    • 被DStreamGraph调用,
  • compute
    • 内部的Compute函数,
    • 每个批处理间隔需要调动生成数据的方法
    • 所有核心操作通过JobScheduler转化到RDD层操作

Spark-Streaming 和Kafka连接的两种方式

  • 基于receiver的方法
    • Receiver是使用Kafka的高层次Consumer API来实现的
      • 在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)
      • 偏移量管理交给kafka, 老版本存在zk中
    • 原理
      • receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
      • Receiver的方式则不能保证只被处理一次,因为Receiver和ZK中的数据可能不同步,spark Streaming可能会重复消费数据
    • 源码
      • KafkaInputDStream
        • 实现的是DStream的ReceiverInputDStream
        • 依赖给出的Receiver类,生成数据
          • Compute函数中调用
        • 与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据
      • Receiver方式就是从Kafka中拉取数据,每次接受固定时间间隔的数据存储到内存中
    • 注意事项
      • Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
      • 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
      • 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
  • 基于Direct的方式
    • 不适用receiver
    • 原理
      • Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。
      • 周期性查询kafka,获取topic和partition的offset, 存储在checkpoint中
    • 优点
      • 简单并且并行
        • kafkaPartition和RDD partition会对应
        • 性能更好
      • 不需要spark开启wal
      • 一次仅且一次的事务保证

Spark Streaming中的batch如何实现的

  • 依赖DStream

spark Streaming的checkpoint与RDD checkpoint有何不同?

  • RDD的checkpoint单纯
    • 第一, 数据的持久化备份
    • 第二, 改变RDD直接的血统依赖
  • ss的,更加丰富
    • 第一层面
      • 元数据的checkpoint
      • 配置信息
      • DStream操作信息
      • 未处理的batch信息
    • 第二层面
      • 有状态的计算的checkpoint
      • 聚合操作的结果
      • 有状态的操作结果
        • 要避免依赖链条变成,而导致的失败恢复时间过长
        • 应该定期checkpoint
      • 可以设置自动的checkpoint
        • 时间为window的5-10个

spark streming在实时处理时会发生什么故障,如何停止,解决?

  • 有两个层面的容错机制
  • DStream层面
    • 元数据信息
    • state信息
  • RDD层面

如果Spark Streaming停掉了,如何保证Kafka的重新运作是合理的呢

  • 首先要说一下 Spark 的快速故障恢复机制
    • DStream的作用是生成RDD
    • 可以利用RDD层面的容错机制
  • 在 Spark 中,计算将会分成许多小的任务,保证能在任何节点运行后能够正确合并
  • 因此,就算某个节点出现故障,这个节点的任务将均匀地分散到集群中的节点进行计算,相对于传递故障恢复机制能够更快地恢复。

job提交流程

  • 创建Streaming Context
    • 初始化Graph
    • 各种组件
  • 输入InputDStream
    • 创建
      • InputDStream
      • 一般还会定义Receiver,在Executor上接收数据
      • 缓存为Block,进行计算
      • 设置好数据源
        • 添加到Graph里面的输入源中
    • 固定时间间隔使用Block内容,启动Job
      • 使用JobGenerator
  • 转换和输出操作
    • 输出操作的时候, 把自己添加到DStreamGraph中
    • 给每个DStream设置好Graph引用
      • 添加到Graph的输出源中
  • start方法
    • 启动JobScheduler
      • 管理Job
      • 生成Job
  • 一个job的执行过程
    • 定时器触发后续一系列操作
      • 每个时间间隔自动生成作业
      • 定期调用JobGenerate方法
    • 内部使用JobGenerator生成job
      • 每个时间间隔自动生成作业
      • 定期调用JobGenerate方法
      • 调用DStreamGraph的jobGenerate方法
      • 挨个调用OutputDStream的JobGenerate
      • getOrCompute
        • 生成RDD
        • 从ForeachDStream开始
        • 可能经过ShuffleDStream和MappedDStream
        • 到InputDStream
      • context.sparkContext.runJob

spark sql

什么是钨丝计划?

有哪些优化?

  • 统一 内存管理模型和 二进制处理( Bi nary Processing )
    • 替换基于 JVM的静态内存管理, 引入 Page 来管理堆内存和堆外内存( on-heap 和 off-heap )
    • 直接操作内存中的二进制数据,而不是 Java 对象
  • 代码生成( Code Generation )
  • 内存列存储
    • 面向列的列存储
  • 基于缓存感知的计算
    • Spark 内存读取操作也会带来一部分性能损耗,鸽丝计划便设计了缓存友好的算法和数据结构来提高缓存命中 率
    • 充分利用Ll /L2/L3 三级缓存,大幅提高了内存读取速度,进而缩短了内存中整个计算过程的时间

rdd dc ds性能问题

  • ds>df>rdd 性能?
  • 为什么?
    • RDD优缺点
      • 编译时类型安全
      • 面向对象的编程风格
      • 序列化和反序列化的开销很大
        • 通信和IO操作都需要额外的序列化,反序列化
      • GC性能开销
    • DataFrame
      • 本质还是不可变的分布式弹性数据集
      • 带有 schema 元信息
        • 类似于传统数据库中的二维表格。
        • 包含了以ROW为单位的每行数据的列的信息
        • 即 DataFrame 表示的二维表数据集的每一列都带有名称和类型
      • Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存
      • 优缺点
        • 对比RDD提升计算效率、减少数据读取、底层计算优化
          • off-heap优化
          • Tungsten优化
        • DataFrame不是类型安全的, API也不是面向对象风格的
      • DataFrame 不再是一个独立 的类
        • 作为DataSet [Row]的别名定义在 org.apache . spark.sql 这个包对象
        • DataFrame 实际上就是 DataSet[Row]
    • Dataset
      • DataSet以Catalyst逻辑执行计划表示
      • 提供了编译时 的类型安全信息
        • 不同于Row是一个泛化的无类型JVM object
        • Dataset是由一系列的强类型JVM object组成的
      • 引入核心的Encoder
        • 序列化了,不需要额外的序列化开销
          • 幵且数据以编码的二进制形式被存储
          • 不需要反序列化就可以执行sorting、
          • shuffle等操作
      • 有着弱类型,强类型两种
        • Dataset[Row]
          • 别名DataFrame
        • Dataset[T]
      • 特点
        • Transformation 级别的算子作用于 DataSet 会得到一个新的 DataSet
        • action 算子被调用时, Spark 的查询优化器会优化 Transformation 算子形成的逻辑计划 ,并生成一个物理计划,该物理计划可以通过并行和分布式的方式来执行。
        • Dataset 是 Lazy 级别的,
      • 优缺点
        • 提供类型安全和面向对象的编程接口
        • 引入Encoder
          • 不仅能够编译器完成类型安全检查
          • 还能够生成字节码,
          • 与对外数据进行交互
    • 性能好的原因
      • 内存列存储
        • 并不是jvm对象存储方式
      • jvm优化
        • codegen模块
        • 字节码生成技术
        • 对匹配的表达式采用特定的代码动态编译
        • sql表达式gc优化
      • scala优化
        • sql避免了使用scala时,低效的的容易gc的代码

SparkContext和SparkSession有什么区别

  • Spark入口类,SparkContext相当于连接Spark集群的纽带,并用来在集群中创建RDD、accumulators、broadCast变量.一个JVM只允许有一个SparkContext.
  • SparkSession是Spark程序以及用来创建DataSet和DataFrame 的入口类

spark sql执行流程

  • 逻辑计划
    • SqlParser 解析SQL语句
      • 生成逻辑计划
        • 通过sparkSqlParser
        • 未解析的逻辑算子树
      • 过程
        • 由AstBuilder执行节点访问, 将语法树中各种Context转化为对应的LogisticPlan节点
        • 此时是未解析的逻辑算子树
          • 不包括数据信息和列信息
    • 使用Catalyst分析器
      • 生成Analyzed逻辑计划
        • 结合数据字典绑定
        • 提取schema信息
        • 解析后的逻辑算子树
      • 由Analyzer将一系列规则作用于Unresolved LogisticPlan上
    • 使用Catalyst优化器
      • 优化Analyzed逻辑计划
        • Optimized逻辑计划
        • 优化后的逻辑算子树
      • 由优化器Optimizer优化
  • 物理计划
    • 生成物理算子树列表
    • 与spark Planner交互
      • 使用策略生成 优化后的物理计划
      • 将逻辑计划生成物理计划
    • 提交前的准备工作
  • 转换为RDD计划

spark mllib

ml的思想

  • pipeline机制提供数据的装换
    • 实现了transformer接口
  • 训练好的模型也是tranformer
  • 未训练好的模型为一个predictor
    • 提供fit
    • train方法训练

如何训练

  • 核心思想
    • 以lr为例子
    • 实现的是数据样本的划分, 简单模型和大量数据的模式
    • 单机训练后, 通过组件来聚合结果, 然后开始下一轮
  • breeze
    • 提供机器学习单机训练的算法
    • SGD
    • LBFGS
  • Aggregator
    • 对各个partition的训练结果进行聚合
    • treeAggregate算子
  • loss
    • 通过聚合的结果计算loss
    • 在这里添加正则化的支持

spark优化问题

Spark的shuffle是什么样的,怎么优化的

  • 应用程序层面的调优
  • 降低单条记录的资源开销,处理数据倾斜,复用RDD进行缓存,作业并行化执行等等

JVM层面的调优

  • 设置合适的资源量,设置合理的JVM,启用高效的序列化方法如kyro,增大off head内存等等

spark数据倾斜问题

  • 前提是定位数据倾斜
  • 解决方法,有多个方面
    • 改变并行度,可能并行度太少了,导致个别task数据压力大
    • 自定义paritioner,分散key的分布,使其更加均匀
    • 避免不必要的shuffle,如使用广播小表的方式,将reduce-side-join提升为map-side-join
    • 分拆发生数据倾斜的记录,分成几个部分进行,然后合并join后的结果
    • 两阶段聚合,先局部聚合,再全局聚合
    原文作者:大菜菜
    原文地址: https://zhuanlan.zhihu.com/p/74570566
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞