motivation 动机
The various 2G limit in Spark. Spark中存在的各种2G限制问题.
- When reading the data block is stored in the hard disk, the following code fragment is called. 获取缓存在本地硬盘的数据块时,会调用以下代码片段
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
val diskValues = serializerManager.dataDeserializeStream(
blockId,
diskBytes.toInputStream(dispose = true))(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
.map {_.toInputStream(dispose = false)}
.getOrElse { diskBytes.toInputStream(dispose = true) }
serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
}
}
def getBytes(blockId: BlockId): ChunkedByteBuffer = {
val file = diskManager.getFile(blockId.name)
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (file.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(file.length.toInt)
channel.position(0)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
new ChunkedByteBuffer(buf)
} else {
new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
}
} {
channel.close()
}
}
The above code has the following problems: 上面的代码存在以下问题:
* Channel.map(MapMode.READ_ONLY, 0, file.length)
returns an instance of MappedByteBuffer
. the size of MappedByteBuffer
can not exceed 2G. channel.map(MapMode.READ_ONLY, 0, file.length)
返回的实例是MappedByteBuffer
. MappedByteBuffer的大小不能超过2G
* When a Iterator[Any]
is generated, need to load all the data into the memory,this may take up a lot of memory. 获取Iterator[Any]
时需要把全部数据加载到内存中, 这可能会导致占用很多堆外内存.
* MappedByteBuffer map a file to memory, and it’s controlled by operator system, JVM can’t control the memory. MappedByteBuffer 使用系统缓存,系统缓存不可控.
- When using kryo serialized data, the following code fragment is called: 在使用kryo序列化数据时, 会调用以下代码片段:
override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.clear()
val kryo = borrowKryo()
try {
kryo.writeClassAndObject(output, t)
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
"increase spark.kryoserializer.buffer.max value.")
} finally {
releaseKryo(kryo)
}
ByteBuffer.wrap(output.toBytes)
}
The above code has the following problems: 上面的代码存在以下问题:
* The serialization data is stored in the output
internal byte[]
, the size of byte[]
can not exceed 2G. 序列化t时会把序列化后的数据存储在output内部byte[]里, byte[]的大小不能超过2G.
- When RPC writes data to be sent to a Channel, the following code fragment is called: 在RPC把要发送的数据写入到Channel时会调用以下代码片段:
public long transferTo(final WritableByteChannel target, final long position) throws IOException {
Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
// Bytes written for header in this call.
long writtenHeader = 0;
if (header.readableBytes() > 0) {
writtenHeader = copyByteBuf(header, target);
totalBytesTransferred += writtenHeader;
if (header.readableBytes() > 0) {
return writtenHeader;
}
}
// Bytes written for body in this call.
long writtenBody = 0;
if (body instanceof FileRegion) {
writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
} else if (body instanceof ByteBuf) {
writtenBody = copyByteBuf((ByteBuf) body, target);
}
totalBytesTransferred += writtenBody;
return writtenHeader + writtenBody;
}
The above code has the following problems: ~~上面的代码存在以下问题: ~~
* the size of ByteBuf cannot exceed 2G. ByteBuf的大小不能超过2G
* cannot transfer data over 2G in memory. ~~无法传输内存中超过2G的数据 ~~
- When decodes the RPC message received, the following code fragment is called: 解码RPC接收的消息时调用以下代码片段:
public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
Message.Type msgType = Message.Type.decode(in);
Message decoded = decode(msgType, in);
assert decoded.type() == msgType;
logger.trace("Received message {}: {}", msgType, decoded);
out.add(decoded);
}
private Message decode(Message.Type msgType, ByteBuf in) {
switch (msgType) {
case ChunkFetchRequest:
return ChunkFetchRequest.decode(in);
case ChunkFetchSuccess:
return ChunkFetchSuccess.decode(in);
case ChunkFetchFailure:
return ChunkFetchFailure.decode(in);
default:
throw new IllegalArgumentException("Unexpected message type: " + msgType);
}
}
}
The above code has the following problems: 上面的代码存在以下问题:
* the size of ByteBuf cannot exceed 2G. ByteBuf的大小不能超过2G
* Must be in the receiver to complete the data can be decoded. 必须在接收到全部数据时才能解码.
Goals
- Setup for eliminating the various 2G limit in Spark. 解决Spark中存在的各种2G限制问题. (The 2G limit 1,2,3,4)
- Support back-pressure flow control for remote data reading(experimental goal). ~~远程数据读取支持back-pressure flow control(实验目标). ~~ (The 2G limit 4)
- Add buffer pool(long-range goal). 添加缓存池(远期目标).
Design
Setup for eliminating the various 2G limit in Spark. 解决Spark中存在的各种2G限制问题.
Replace ByteBuffer with ChunkedByteBuffer. 使用 ChunkedByteBuffer 替换 ByteBuffer. (The 2G limit 1,2)
ChunkedByteBuffer Introduction: ChunkedByteBuffer 介绍:
- Store data with multiple
ByteBuffer
instance. 用多个ByteBuffer存储数据 - Support reference counting, a necessary condition to the feature of the buffer pool. 支持引用计数,实现资源池必要条件
Reference counted objects - Support serialization for easy transport. 支持序列化,方便传输
- Support slice duplicate and copy operation. 支持类似于ByteBuffer的切片(slice), 副本(duplicate)和复制(copy)等操作, 方便处理
- Can be efficiently converted to
InputStream
,ByteBuffer
,byte[]
andByteBuf
, etc. 可以高效转换成InputStream
,ByteBuffer
,byte[]
和ByteBuf
等,便于和其他接口对接 - 可以方便的写入数据到
OutputStream
- Move the ChunkedByteBuffer class to
common/network-common/src/main/java/org/apache/spark/network/buffer/
. ~~把ChunkedByteBuffer类移动到common/network-common/src/main/java/org/apache/spark/network/buffer/
. ~~ - Modify
ManagedBuffer.nioByteBuffer
‘s return value to ChunkedByteBuffer instance. 修改ManagedBuffer.nioByteBuffer的返回值为ChunkedByteBuffer实例. (The 2G limit 1) - Further standardize the use of
ManagedBuffer
andChunkedByteBuffer
. 进一步规范ManagedBuffer
和ChunkedByteBuffer
的使用.
- Data in memory, network, disk and other sources are represented with
ManagedBuffer
, 内存,网络,硬盘和其他来源的数据使用ManagedBuffer
表示. - ChunkedByteBuffer only represents the data in the memory. ChunkedByteBuffer只表示内存中的数据.
-
ManagedBuffer.nioByteBuffer
is called only when there is sufficient memory. 只有在确认有足够的内存保存数据时才会调用ManagedBuffer.nioByteBuffer.
- Modify the parameter of
SerializerInstance.deserialize
and the return value ofSerializerInstance.serialize
to ChunkedByteBuffer instance.
修改SerializerInstance.deserialize方法的参数和SerializerInstance.serialize方法的返回值改为ChunkedByteBuffer实例. (The 2G limit 2)
def serialize[T: ClassTag](t: T): ChunkedByteBuffer = {
output.clear()
val out = ChunkedByteBufferOutputStream.newInstance()
// The data is output to the OutputStream, rather than the internal byte[] in the output object.
// ~~序列化后的数据输出到OutputStream,而不是到output对象的内部字节数组里.~~
output.setOutputStream(out)
val kryo = borrowKryo()
kryo.writeClassAndObject(output, t)
output.close()
out.toChunkedByteBuffer
}
- Other changes. 其他修改.
Replace ByteBuf with InputStream. 使用 InputStream 替换 ByteBuf.
- Add InputStreamManagedBuffer class, used to convert InputStream instance to ManagedBuffer instance. 添加InputStreamManagedBuffer类,用于把InputStream转换成ManagedBuffer实例. (The 2G limit 4)
- Modify
NioManagedBuffer.convertToNetty
method returns InputStream instances when the size of data is larger than Integer.MAX_VALUE. 修改NioManagedBuffer.convertToNetty
方法在数据量大于Integer.MAX_VALUE时返回InputStream实例. (The 2G limit 3) - Modify MessageWithHeader classes, support processing InputStream instance (The 2G limit 3) 修改MessageWithHeader类, 支持处理InputStream类型的body对象
-
2.
和3.
的修改结合起来支持传输内存中超过2G的数据.
- Modify the parameters of the
Encodable.encode
method to OutputStream instance. 修改Encodable.encode方法的参数为OutputStream实例. (The 2G limit 4)
5.It can handle mixed storage data. ~~UploadBlock添加toInputStream方法,支持处理混合存储数据(The 2G limit 3) ~~
public InputStream toInputStream() throws IOException {
ChunkedByteBufferOutputStream out = ChunkedByteBufferOutputStream.newInstance();
Encoders.Bytes.encode(out, type().id());
encodeWithoutBlockData(out);
// out.toChunkedByteBuffer().toInputStream() data in memory
// blockData.createInputStream() data in hard disk(FileInputStream)
return new SequenceInputStream(out.toChunkedByteBuffer().toInputStream(),
blockData.createInputStream());
}
-
2
,3
,4
and5
are combined to resolve the 2G limit in RPC message encoding and sending process.2.
3.
4.
和5.
组合起来解决RPC消息编码和发送过程中的2G限制.
- Modify the parameters of the decode method of the classes who implement the Encodable interface to InputStream instance. ~~修改实现Encodable接口子类的decode方法参数为InputStream实例. (The 2G limit 4) ~~
- Modify TransportFrameDecoder class, use
LinkedList<ByteBuf>
to represent the Frame, remove the size limit of Frame. ~~修改TransportFrameDecoder类,使用LinkedList<ByteBuf>
来表示Frame,移除Frame的大小限制. ~~ (The 2G limit 4) - Add ByteBufInputStream class, used to convert
LinkedList<ByteBuf>
instance to InputStream instance. 添加ByteBufInputStream类,用于把LinkedList<ByteBuf>包装成InputStream实例. 在读取完一个ByteBuf的数据时就会调用ByteBuf.release
方法释放ByteBuf. (The 2G limit 4) - Modify the parameters of
RpcHandler.receive
method to InputStream instance. 修改RpcHandler.receive
方法的参数为InputStream实例. (The 2G limit 4)
-
6
,7
,8
and9
are combined to resolve the 2G limit in RPC message receiving and decoding process.6.
7.
8.
和9.
组合起来解决RPC消息接收和解码的过程中的2G限制
Read data
Local data
- Only the data stored in the memory is represented by ChunkedByteBuffer, the other is represented by ManagedBuffer. 只有存储在内存中的数据用 ChunkedByteBuffer 表示,其他的数据都使用 ManagedBuffer 表示. (The 2G limit 1)
- Modify
DiskStore.getBytes
‘s return value type to ManagedBuffer instance, which callsManagedBuffer.nioByteBuffer
only when the memory has enough space to store the ManagedBuffer data. 修改DiskStore.getBytes
的返回值为ManagedBuffer实例, 只有在内存有足够的空间储存ManagedBuffer数据时才会调用ManagedBuffer.nioByteBuffer
方法.
Remote Data (The 2G limit 4)
There are three options: 有三个可选方案:
- Add InputStreamInterceptor to support propagate back-pressure to shuffle server(The option has been implemented): 添加InputStreamInterceptor支持propagate back-pressure 到 shuffle server端(该方案已经实现):
- When the number of ByteBuf in the cache exceeds a certain amount, call
channel.config ().SetAutoRead (false)
disable AUTO_READ, no longer automatically callchannle.read ()
. ~~在缓存的 ByteBuf 数量超过一定数量时调用channel.config().setAutoRead(false)
禁用AUTO_READ, 不再自动调用channle.read()
. ~~ - When the number of ByteBuf in the cache is smaller than a certain amount, call
channel.config().setAutoRead(true)
enable AUTO_READ . ~~在缓存的 ByteBuf 数量小于一定数量时调用channel.config().setAutoRead(true)
激活AUTO_READ. ~~ - The advantage of this option is to support propagate back-pressure; drawback is that can lead semantic change the existing API, in some cases the IO retry function is invalid. 该方案的优点是支持propagate back-pressure; 缺点是会导致现有API的语义改变, 某些情况下导致错误重试功能失效.
- 参考文档:
– Netty的read事件与AUTO_READ模式
– TCP/IP详解–举例明白发送/接收缓冲区、滑动窗口协议之间的关系
– TCP 滑动窗口协议 详解 - InputStreamInterceptor设计方案:
- 创建一固定大小线程安全缓存池
- netty线程接收到ByteBuf放到缓存池, 如果缓存的ByteBuf超过缓存容量的90%时,调用
channel.config().setAutoRead(false)
, 不在自动接收数据. 对端写入堵塞. - 数据处理线程从缓冲池中取出ByteBuf, 如果缓存的ByteBuf数量少于缓存池容量的10%,调用
channel.config().setAutoRead(true)
, 激活数据自动读取. - 如果处理完一个ByteBuf,释放该ByteBuf, 并调用
channle.read()
接收数据.
- When the size of message is greater than a certain value, the message is written to disk, not take up memory. ~~在消息大小大于一定值时,把消息写到硬盘上,不再占用内存. ~~
- The advantage of this options is to take up very little memory, the disadvantage is to increase the disk IO. 该方案的优点是占用很少的内存,缺点是增加磁盘IO.
- Combined with buffer pool, qs far as possible stores data in memory. ~~结合缓存池,尽可能的把数据存储在内存里. ~~
- Write message to the buffer pool when there has enough memory, otherwise write on disk. ~~把消息写到缓存池, 在缓存池中有足够的内存时,内存不足时才写到硬盘上. ~~
Add buffer pool
The buffer pool can reduce memory allocation, reduce GC time, improve the performance of spark core. 缓存池能够减少内存分配占用, 减少GC时间,提升程序性能
- Reduce the number of large objects created in the Eden area, according to experience twitter using buffer pools can significantly reduce the number of GC. 减少在eden区创建大对象的次数,根据twitter的经验,使用缓存池能显著减少GC次数.
Netty 4 Reduces GC Overhead by 5x at Twitter - Use buffer pool to reduce the number of memory allocations and wiping zero. 使用缓存池能够减少内存分配和抹零次数.
Using as a generic library
实现该功能的难点有:
- Spark在使用ByteBuffer时没有考虑释放问题, 由java GC回收.
- 添加引用计数主动释放, 减少GC压力, 需要添加引用计数和内存泄露检测相关代码, 改动大.
- 复用netty buffer代码,支持内存泄露检查和动态调整大小.