MergeManagerImpl 类
内存参数计算
maxInMemCopyUse
位于构造函数中
final float maxInMemCopyUse =
jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT,
MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException("Invalid value for " +
MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
maxInMemCopyUse);
}
这个参数就是reducer端shuffle输入缓冲占JVM堆的比例即参数:mapreduce.reduce.shuffle.input.buffer.percent
,默认为0.7f
memoryLimit
位于构造函数中,memoryLimit一般情况下就是reducer端输入缓冲的大小,由比例值和JVM堆大小相乘得出。
// Allow unit tests to fix Runtime memory
this.memoryLimit =
(long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
* maxInMemCopyUse);
但是仔细观察代码发现它没有直接将Runtime.getRuntime().maxMemory()
和maxInMemCopyUse
相乘,而是对前者和Integer.MAX_VALUE取了个最小值,这个设计还是挺令人费解的。不过这是在没有设定REDUCE_MEMORY_TOTAL_BYTES(mapreduce.reduce.memory.totalbytes)参数的情况下,如果设定了那么会忽略后面的值。注释中其实也给了一些说明,但是只是为了针对unit tests来fix这个问题。但是个人觉得实际中还是可能会有这样的大内存需求的情况出现,来提高copy-merge速度。
maxInMemReduce
位于最后一轮merge(finalMerge)过程中的计算,这个参数表示reduce开始后,保留给shuffle数据占JVM堆大小。如果reducer不需要太占内存的话,可以让这个值大一点,这样就可以减少写入磁盘的记录数,reducer可以直接从内存获取输入。
final float maxRedPer =
job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
maxRedPer);
}
int maxInMemReduce = (int)Math.min(
Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
maxRedPer对应与参数mapreduce.reduce.input.buffer.percent
(旧版为mapred.job.reduce.input.buffer.percent
),即保留给shuffle数据空间占堆的比例,接着又开始发生坑爹的逻辑了又要把上限设定为Integer.MAX_VALUE了,不过这里我们看出一些端倪,是不是因为原来结构中maxInMemReduce对应的数据类型是int而不是long造成了目前这种奇异的逻辑。这次没有什么方法可以突破这个限制,也就是说当shuffle数据超过2GB后,无论如何调整io.sort.factor
和mapreduce.reduce.input.buffer.percent
都将使得超过2GB大小的数据写到磁盘上,即使分配了足够的堆空间。