通常,当把一个函数传递给Spark的操作时(例如map
或reduce
操作),函数可以使用在驱动程序中定义的变量,但是函数在不同的节点上执行,每个节点都需要对函数(闭包)内的每个变量进行拷贝发送,而在远程机器上变量的更新不会传播回到驱动程序。
为此,Spark提供了特殊类型的共享变量:广播变量(broadcast variables
),在每个节点上的内存上都缓存一个值,累加器(accumulators
),工作节点只能add
,可以用于计数和求和等
Broadcast Variables
广播变量让程序更高效的发送大型的只读数据集,缓存到每个机器上,方便进行多次使用,不需要在每个task中都包含一份复制
Spark actions 是分阶段执行的,之间以shuffle
操作划分。 Spark会自动广播同一阶段的任务所需的公共数据,对应的数据以序列化形式缓存,在运行任务之前反序列化。因此显式创建广播变量仅在跨多个阶段Stage的任务需要相同数据才需要,例如可以用来广播比较大的查找表,或者机器学习算法的特征向量
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
广播机制底层使用Orchestra,类似BitTorrent传输
具体实现类为TorrentBroadcast
,driver端将串行化的数据切分成多个块,由driver端的BlockManager
负责存储,在每个executor上,优先从本地BlockManager
上读取,其次从driver端或者其他executor上远程获取数据。它会将获取到的数据放置在自己的BlockManager
内,方便其他executor获取数据,避免了driver端同时发送多个广播变量的拷贝
Accumulators
用来聚合工作节点上的数值到驱动程序,用来计数求和等
Spark本身支持数字类型的累加器,例如DoubleAccumulator
,LongAccumulator
,CollectionAccumulator
等
AccumulatorV2
是累加器的基类,可以通过继承添加对自定义类型的支持,子类需要实现其中的抽象方法,例如add
,merge
,使用自定义的累加器时要记得进行注册,因为内置的累加器在SparkContext创建时默认进行了注册
doubleAccumulator: DoubleAccumulator//创建一个没有名称的double类型累加器
longAccumulator(name: String): LongAccumulator//创建一个带名称的long类型累加器
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
- 工作节点不能访问累加器的值
value
, 只能进行写操作add
- 因为容错机制,部分节点可能需要重新计算,Spark确保
action
操作中的累加器,只会被更新一次,例如foreach
,在转换操作中,一个任务可能因为重复执行多次更新累加器 - driver端应该在action操作后再访问累加器的值,因为transformation操作是惰性求值,不会主动执行