对于storage, 为何Spark需要storage模块?为了cache RDD
Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block
所以storage模块,就是要实现RDD在memory和disk上的persistent功能
首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave
master负责track所有的slave BlockManager的BlockManagerInfo, 而BlockManagerInfo中又track了该BlockManager管理的所有的block的BlockStatus
当slave上的block有任何变化的时候, 需要发送updateBlockInfo事件来更新master上block信息
典型的中心化设计, master和slave之间的通信通过actor来进行, 当然对于block数据的传输, 由于数据量比较大, 所以使用connectionManager(NIO或Netty)
所以自然需要BlockManagerMasterActor和BlockManagerSlaveActor, 参考Spark 源码分析 – BlockManagerMaster&Slave
其中还有个BlockManagerMaster,负责wrap BlockManagerMasterActor, 比较confusing的是每个节点都会创建这个BlockManagerMaster, 只是在slave中不会真正创建BlockManagerMasterActor, 而是Ref, 不好的设计
而且由于BlockManager被master和slave公用, 所以提供了两者大部分接口, 而对于master部分都是直接wrap BlockManagerMaster, 而对于slave中的数据读写等就直接实现了, 设计不统一
总之, storage这个模块, 设计比较随意, 不是很合理, 也体现在一些细小的命名上, 给分析和理解带来了一些困难.
在SparkEnv的初始化中, 创建BlockManagerMaster和blockManager
val blockManagerMaster = new BlockManagerMaster(registerOrLookup( "BlockManagerMaster", new BlockManagerMasterActor(isLocal))) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
// 创建actor和actor ref
// 对于BlockManagerMaster, 在master上创建BlockManagerMasterActor, 而在slave上创建BlockManagerMasterActor的ref
def registerOrLookup(name: String, newActor: => Actor): ActorRef = { if (isDriver) { logInfo("Registering " + name) actorSystem.actorOf(Props(newActor), name = name) } else { val driverHost: String = System.getProperty("spark.driver.host", "localhost") val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt Utils.checkHost(driverHost, "Expected hostname") val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name) logInfo("Connecting to " + name + ": " + url) actorSystem.actorFor(url) } }
1 BlockManagerId
BlockManagerId作为BlockManager唯一标识, 所以希望一个BlockManager只创建一个BlockManagerId 对象
典型Singleton的场景
在Scala里面实现Singleton比较晦涩, 这里是个典型的例子
将所有的构造函数设为private, 然后利用伴生对象的来创建对象实例
/** * This class represent an unique identifier for a BlockManager. * The first 2 constructors of this class is made private to ensure that * BlockManagerId objects can be created only using the apply method in * the companion object. This allows de-duplication of ID objects. * Also, constructor parameters are private to ensure that parameters cannot * be modified from outside this class. */ private[spark] class BlockManagerId private ( private var executorId_ : String, private var host_ : String, private var port_ : Int, private var nettyPort_ : Int ) extends Externalizable { private def this() = this(null, null, 0, 0) // For deserialization only }
private[spark] object BlockManagerId { /** * Returns a [[org.apache.spark.storage.BlockManagerId]] for the given configuraiton. * * @param execId ID of the executor. * @param host Host name of the block manager. * @param port Port of the block manager. * @param nettyPort Optional port for the Netty-based shuffle sender. * @return A new [[org.apache.spark.storage.BlockManagerId]]. */ def apply(execId: String, host: String, port: Int, nettyPort: Int) = getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort)) def apply(in: ObjectInput) = { val obj = new BlockManagerId() obj.readExternal(in) getCachedBlockManagerId(obj) } val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { blockManagerIdCache.putIfAbsent(id, id) blockManagerIdCache.get(id) } }
2 BlockManager
BlockManager是被master和slave公用的, 但对于master的逻辑都已经wrap在BlockManagerMaster中了
所以这里主要分析一些slave相关的接口逻辑, reportBlockStatus, get, put
其中put, get使用到memoryStore和diskStore, 参考Spark 源码分析 — BlockStore
private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, val master: BlockManagerMaster, val defaultSerializer: Serializer, maxMemory: Long) extends Logging { private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {} // BlockInfo的定义, 详细见下 val shuffleBlockManager = new ShuffleBlockManager(this) private val blockInfo = new TimeStampedHashMap[String, BlockInfo] // 记录manage的所有block的BlockInfo [blockid,blockinfo] private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore: DiskStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
val blockManagerId = BlockManagerId(executorId, connectionManager.id.host, connectionManager.id.port, nettyPort) // BlockManagerId
val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)), // 创建slaveActor, 貌似每个BlockManager都会创建slaveActor name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
/** * Initialize the BlockManager. Register to the BlockManagerMaster, and start the * BlockManagerWorker actor. */ private def initialize() { master.registerBlockManager(blockManagerId, maxMemory, slaveActor) // 向master注册BlockManager, 如果本身就是driver, 啥都不做 BlockManagerWorker.startBlockManagerWorker(this) // 创建BlockManagerWorker用于和remote传输block,block比较大所以无法用akka if (!BlockManager.getDisableHeartBeatsForTesting) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { // 设定scheduler定期发送hb heartBeat() } } }
2.1 BlockInfo
BlockInfo关键是对block做了访问互斥, 访问block前需要, 先waitForReady
所以每个block, 都需要生成一个BlockInfo来经行互斥管理
这个为啥叫BlockInfo?
BlockManagerMasterActor中updateBlockInfo事件更新的不是这个BlockInfo, 而是BlockManagerInfo.BlockStatus, 不太合理!
private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { @volatile var pending: Boolean = true @volatile var size: Long = -1L @volatile var initThread: Thread = null @volatile var failed = false setInitThread() private def setInitThread() { // Set current thread as init thread - waitForReady will not block this thread // (in case there is non trivial initialization which ends up calling waitForReady as part of // initialization itself) this.initThread = Thread.currentThread() } /** * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing). * Return true if the block is available, false otherwise. */ def waitForReady(): Boolean = { if (initThread != Thread.currentThread() && pending) { synchronized { while (pending) this.wait() } } !failed } /** Mark this BlockInfo as ready (i.e. block is finished writing) */ def markReady(sizeInBytes: Long) { assert (pending) size = sizeInBytes initThread = null failed = false initThread = null pending = false synchronized { this.notifyAll() } } /** Mark this BlockInfo as ready but failed */ def markFailure() { assert (pending) size = 0 initThread = null failed = true initThread = null pending = false synchronized { this.notifyAll() } } }
2.2 reportBlockStatus
/** * Tell the master about the current storage status of a block. This will send a block update * message reflecting the current status, *not* the desired storage level in its block info. * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. * * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid). * This ensures that update in master will compensate for the increase in memory on slave. */ def reportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L) { val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize) // 如果返回false, 说明你发的blockid在master没有, 需要重新注册 if (needReregister) { logInfo("Got told to reregister updating block " + blockId) // Reregistering will report our new block for free. asyncReregister() } logDebug("Told master about block " + blockId) } /** * Actually send a UpdateBlockInfo message. Returns the mater's response, * which will be true if the block was successfully recorded and false if * the slave needs to re-register. */ private def tryToReportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = { val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized { info.level match { case null => (StorageLevel.NONE, 0L, 0L, false) case level => val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication) val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L (storageLevel, memSize, diskSize, info.tellMaster) } } if (tellMaster) { // 把当前block的情况, disk和memory的使用情况报告给master master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize) } else { true } }
2.3 Get
/** * Get block from local block manager, 在本地读取block */ def getLocal(blockId: String): Option[Iterator[Any]] = { val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { // 对block的互斥访问 // In the another thread is writing the block, wait for it to become ready. if (!info.waitForReady()) { // 等待block ready, block只能被线性的写入 // If we get here, the block write failed. logWarning("Block " + blockId + " was marked as failure.") return None } val level = info.level // Look for the block in memory if (level.useMemory) { // 如果storage level是用到memory的, 就先在memoryStore中试图取这个block memoryStore.getValues(blockId) match { case Some(iterator) => return Some(iterator) // 直接返回iterator case None => logDebug("Block " + blockId + " not found in memory") } } //前面在memory中没有找到, 所以继续在disk里面找
//Look for block on disk, potentially loading it back into memory if required if (level.useDisk) { if (level.useMemory && level.deserialized) { // MEMORY_AND_DISK, 没有序列化, 部分数据在disk diskStore.getValues(blockId) match { case Some(iterator) => // 从disk中取出这个block, 并重新放到memory中 // Put the block back in memory before returning it // TODO: Consider creating a putValues that also takes in a iterator ? val elements = new ArrayBuffer[Any] elements ++= iterator memoryStore.putValues(blockId, elements, level, true).data match { case Left(iterator2) => // 期望从putValues中得到存入block的iterator return Some(iterator2) case _ => throw new Exception("Memory store did not return back an iterator") } case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") } } else if (level.useMemory && !level.deserialized) { // MEMORY_AND_DISK_SER, 序列化 // Read it as a byte buffer into memory first, then return it diskStore.getBytes(blockId) match { // 由于读取的是序列化数据, 使用getBytes case Some(bytes) => // Put a copy of the block back in memory before returning it. Note that we can't // put the ByteBuffer returned by the disk store as that's a memory-mapped file. // The use of rewind assumes this. assert (0 == bytes.position()) val copyForMemory = ByteBuffer.allocate(bytes.limit) copyForMemory.put(bytes) memoryStore.putBytes(blockId, copyForMemory, level) // 在memoryStore中缓存的仍然是序列化数据 bytes.rewind() // 反序列化需要重新读数据, 所以rewind return Some(dataDeserialize(blockId, bytes)) // 但返回的需要反序列化后的数据 case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") } } else { // DISK_ONLY, 没啥说的, 直接取disk读 diskStore.getValues(blockId) match { case Some(iterator) => return Some(iterator) case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") } } } } } else { logDebug("Block " + blockId + " not registered locally") } return None }
/** * Get block from the local block manager as serialized bytes. */ def getLocalBytes(blockId: String): Option[ByteBuffer] = { //逻辑更简单...... }
/** * Get block from remote block managers. */ def getRemote(blockId: String): Option[Iterator[Any]] = { // Get locations of block val locations = master.getLocations(blockId) // Get block from remote locations for (loc <- locations) { val data = BlockManagerWorker.syncGetBlock( //使用BlockManagerWorker从remote获取block GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { return Some(dataDeserialize(blockId, data)) } logDebug("The value of block " + blockId + " is null") } logDebug("Block " + blockId + " not found") return None }
2.3 Put
/** * Put a new block of values to the block manager. Returns its (estimated) size in bytes. */ def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, tellMaster: Boolean = true) : Long = { // Remember the block's storage level so that we can correctly drop it to disk if it needs // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. val myInfo = { val tinfo = new BlockInfo(level, tellMaster) // 创建新的BlockInfo // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) // check blockid的blockinfo是否已经存在 if (oldBlockOpt.isDefined) { // 如果存在就需要互斥 if (oldBlockOpt.get.waitForReady()) { logWarning("Block " + blockId + " already exists on this machine; not re-adding it") return oldBlockOpt.get.size } // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ? oldBlockOpt.get } else { tinfo } } // If we need to replicate the data, we'll want access to the values, but because our // put will read the whole iterator, there will be no values left. For the case where // the put serializes data, we'll remember the bytes, above; but for the case where it // doesn't, such as deserialized storage, let's rely on the put returning an Iterator. var valuesAfterPut: Iterator[Any] = null // Ditto for the bytes after the put var bytesAfterPut: ByteBuffer = null // Size of the block in bytes (to return to caller) var size = 0L myInfo.synchronized { // 加锁, 开始真正的put var marked = false try { if (level.useMemory) { // 如果可以用memory, 优先放memory里面 // Save it just to memory first, even if it also has useDisk set to true; we will later // drop it to disk if the memory store can't hold it. val res = memoryStore.putValues(blockId, values, level, true) size = res.size res.data match { case Right(newBytes) => bytesAfterPut = newBytes case Left(newIterator) => valuesAfterPut = newIterator } } else { // 否则存到disk上 // Save directly to disk. // Don't get back the bytes unless we replicate them. val askForBytes = level.replication > 1 val res = diskStore.putValues(blockId, values, level, askForBytes) size = res.size res.data match { case Right(newBytes) => bytesAfterPut = newBytes case _ => } } // Now that the block is in either the memory or disk store, let other threads read it, // and tell the master about it. marked = true // 释放blockinfo上的互斥条件, 让其他线程可以访问改block myInfo.markReady(size) if (tellMaster) { reportBlockStatus(blockId, myInfo) // 通知master, block状态变化 } } finally { // If we failed at putting the block to memory/disk, notify other possible readers // that it has failed, and then remove it from the block info map. if (! marked) { // 如果put失败, 需要做些clear工作 // Note that the remove must happen before markFailure otherwise another thread // could've inserted a new BlockInfo before we remove it. blockInfo.remove(blockId) myInfo.markFailure() logWarning("Putting block " + blockId + " failed") } } } // Replicate block if required if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis // Serialize the block if not already done if (bytesAfterPut == null) { if (valuesAfterPut == null) { throw new SparkException( "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") } bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } replicate(blockId, bytesAfterPut, level) // 做replicate logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime)) } BlockManager.dispose(bytesAfterPut) return size }
/** * Put a new block of serialized bytes to the block manager. */ def putBytes( blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) { //逻辑比较简单...... }
/** * Replicate block to another node. */ var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) { val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) if (cachedPeers == null) { cachedPeers = master.getPeers(blockManagerId, level.replication - 1) //找到可用于replica的peers } for (peer: BlockManagerId <- cachedPeers) { //把需要replica的block放到这些peer上去 val start = System.nanoTime data.rewind() if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), //通过BlockManagerWorker传输block数据 new ConnectionManagerId(peer.host, peer.port))) { logError("Failed to call syncPutBlock to " + peer) } logDebug("Replicated BlockId " + blockId + " once used " + (System.nanoTime - start) / 1e6 + " s; The size of the data is " + data.limit() + " bytes.") } }
2.3 dropFromMemory
/** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. */ def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) { logInfo("Dropping block " + blockId + " from memory") val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { //获取blockInfo的互斥 // required ? As of now, this will be invoked only for blocks which are ready // But in case this changes in future, adding for consistency sake. if (! info.waitForReady() ) { // If we get here, the block write failed. logWarning("Block " + blockId + " was marked as failure. Nothing to drop") return } val level = info.level if (level.useDisk && !diskStore.contains(blockId)) { // 如果使用disk, 就把memory中要删除的写入disk logInfo("Writing block " + blockId + " to disk") data match { case Left(elements) => diskStore.putValues(blockId, elements, level, false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } } val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L // 计算出从memory中drop掉的size val blockWasRemoved = memoryStore.remove(blockId) // 从memoryStore drop掉block if (info.tellMaster) { reportBlockStatus(blockId, info, droppedMemorySize) // 通知master, block信息变化 } if (!level.useDisk) { // The block is completely gone from this node; forget it so we can put() it again later. blockInfo.remove(blockId) // 如果没有使用disk, 那么从memory中删除, 意味着完全删除这个block } } } else { // The block has already been dropped } }