Spark 共享变量

通常,当把一个函数传递给Spark的操作时(例如mapreduce操作),函数可以使用在驱动程序中定义的变量,但是函数在不同的节点上执行,每个节点都需要对函数(闭包)内的每个变量进行拷贝发送,而在远程机器上变量的更新不会传播回到驱动程序。

为此,Spark提供了特殊类型的共享变量:广播变量(broadcast variables),在每个节点上的内存上都缓存一个值,累加器(accumulators),工作节点只能add,可以用于计数和求和等

Broadcast Variables

广播变量让程序更高效的发送大型的只读数据集,缓存到每个机器上,方便进行多次使用,不需要在每个task中都包含一份复制

Spark actions 是分阶段执行的,之间以shuffle操作划分。 Spark会自动广播同一阶段的任务所需的公共数据,对应的数据以序列化形式缓存,在运行任务之前反序列化。因此显式创建广播变量仅在跨多个阶段Stage的任务需要相同数据才需要,例如可以用来广播比较大的查找表,或者机器学习算法的特征向量

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

广播机制底层使用Orchestra,类似BitTorrent传输

具体实现类为TorrentBroadcast,driver端将串行化的数据切分成多个块,由driver端的BlockManager负责存储,在每个executor上,优先从本地BlockManager上读取,其次从driver端或者其他executor上远程获取数据。它会将获取到的数据放置在自己的BlockManager内,方便其他executor获取数据,避免了driver端同时发送多个广播变量的拷贝

Accumulators

用来聚合工作节点上的数值到驱动程序,用来计数求和等
Spark本身支持数字类型的累加器,例如DoubleAccumulatorLongAccumulatorCollectionAccumulator
AccumulatorV2是累加器的基类,可以通过继承添加对自定义类型的支持,子类需要实现其中的抽象方法,例如addmerge,使用自定义的累加器时要记得进行注册,因为内置的累加器在SparkContext创建时默认进行了注册

doubleAccumulator: DoubleAccumulator//创建一个没有名称的double类型累加器
longAccumulator(name: String): LongAccumulator//创建一个带名称的long类型累加器

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

  • 工作节点不能访问累加器的值value, 只能进行写操作add
  • 因为容错机制,部分节点可能需要重新计算,Spark确保action操作中的累加器,只会被更新一次,例如foreach,在转换操作中,一个任务可能因为重复执行多次更新累加器
  • driver端应该在action操作后再访问累加器的值,因为transformation操作是惰性求值,不会主动执行
    原文作者:wangdy12
    原文地址: https://www.jianshu.com/p/a88ed0ab58a6
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞