一、介绍
在MapReduce处理过程中主要分为四个阶段:Split(分片)阶段、Map阶段、Shuffle(混排、重拍)阶段、Reduce阶段。接下来笔者将会分别详细介绍着四个阶段,也会加上Hadoop2.6.0的源码进行分析。从而加深读者对Split阶段的理解。
我们知道从文件开始读取,经过一些列处理后,文件数据将以<key,value>键值对的方式进入到Map阶段中,这就是Split的主要任务,下面我们详细介绍这一些类操作。
二、Split介绍
想要了解Split的过程,就必须要了解三个类:FileSplit.class、InputFormat.class和LineRecordReader.class
(1)org.apache.hadoop.mapreduce.lib.input.FileSplit
public class FileSplit extends InputSplit implements Writable {
private Path file;
private long start;
private long length;
private String[] hosts;
private SplitLocationInfo[] hostInfos;
public FileSplit() {}
public FileSplit(Path file, long start, long length, String[] hosts) {
this.file = file;
this.start = start;
this.length = length;
this.hosts = hosts;
}
public FileSplit(Path file, long start, long length, String[] hosts,
String[] inMemoryHosts) {
this(file, start, length, hosts);
hostInfos = new SplitLocationInfo[hosts.length];
for (int i = 0; i < hosts.length; i++) {
// because N will be tiny, scanning is probably faster than a HashSet
boolean inMemory = false;
for (String inMemoryHost : inMemoryHosts) {
if (inMemoryHost.equals(hosts[i])) {
inMemory = true;
break;
}
}
hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
}
}
/** The file containing this split's data. */
public Path getPath() { return file; }
/** The position of the first byte in the file to process. */
public long getStart() { return start; }
/** The number of bytes in the file to process. */
@Override
public long getLength() { return length; }
@Override
public String toString() { return file + ":" + start + "+" + length; }
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
@Override
public String[] getLocations() throws IOException {
if (this.hosts == null) {
return new String[]{};
} else {
return this.hosts;
}
}
@Override
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return hostInfos;
}
}
这个类是从Hadoop2.6源代码中复制出来的,并且删除了一部分代码,从该类中我们看出,一个Split分片的内容主要包括Path file、Long start、Long length和String[] hosts。从这四个可以看出Split分片并没有实际的数据,分片其实只是对一个文件进行逻辑上的分片,数据还是按照Block的方式保存在HDFS中,而一个Split分片的主要记录了该分片是从文件的那个位置开始,长度是多少,这些数据的位置在哪里这些信息,在读取分片数据的时候,是根据FileSplit类中的信息去读取相应的Block的数据。这也是为什么分片最好和Block大小相同的原因,如果一个FileSplit的大小大于一个Block的大小,则该分片可能会需要从其他节点的Block读取数据,这样就会造成不必要的网络传输,导致处理时间增长。
(2)org.apache.hadoop.mapreduce.InputFormat
public abstract class InputFormat<K, V> {
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
}
我们在实现读取分片的过程就有这两个函数完成,首先我们先介绍getSplit()函数
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
Stopwatch sw = new Stopwatch().start();
FileStatus[] files = listStatus(job);//获取Job中所有文件是状态信息(filestatus类)
job.setLong(NUM_INPUT_FILES, files.length); //设置输入文件数量为文件状态的个数
long totalSize = 0; // 计算job中所有文件的总大小
for (FileStatus file: files) { // 便利所有的文件,检查是否含有文件
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);//根据map数量,计算每个map的处理量
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); //获取分片大小的最大值
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);//根据map的数量创建分片数组
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) { //对每一个文件进行便利
Path path = file.getPath(); //获取当前文件的路径
long length = file.getLen(); //获取当前文件的大小
if (length != 0) { //在文件不为空的情况下
FileSystem fs = path.getFileSystem(job); //创建FileSystem示例
BlockLocation[] blkLocations; //创建BlockLocation数组,准备记录该文件所有Block的位置
if (file instanceof LocatedFileStatus) { //整个判断语句就是用来获取该文件Block的位置信息
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) { //isSplitable为true时,表示一个分片中只允许一个文件,false相反
long blockSize = file.getBlockSize(); //获取文件的Block的大小
long splitSize = computeSplitSize(goalSize, minSize, blockSize);//计算一个分片的实际大小
long bytesRemaining = length; //文件剩余大小
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap); //根据Block位置信息、split大小等信息进行计算split的IP地址
splits.add(makeSplit(path, length-bytesRemaining, splitSize, //将新产生的Split加入到split的List中
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize; //减小文件所剩的大小
}
if (bytesRemaining != 0) { //当文件还有剩余量时,经剩余的文件放入到新的分片中
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else { //isSplitable==false时,则一个文件可以不被分片,将文件放入到一个split中
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);//获取split所在的IP地址
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); //放入splits的List中
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits.toArray(new FileSplit[splits.size()]);
}
通过getSplit()函数得到对文件的分片信息后,然后读取分片表示的数据,并生成<key,value>键值对送入到map端,而生成<key,value>键值对则是由createRecordReader()函数完成。
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter"); //读取文件的默认分隔符
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes); //调用LineRecordReader方法,生成<key,vavle>键值对
}</span>
这样通过InputFormat类中的两个方法,我们就完成了对文件的分片和读取<key,value>值。
(3)org.apache.hadoop.mapreduce.lib.input.LineRecordReader
public class LineRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
public static final String MAX_LINE_LENGTH =
"mapreduce.input.linerecordreader.line.maxlength";
private long start;
private long pos;
private long end;
private SplitLineReader in;
private FSDataInputStream fileIn;
private Seekable filePosition;
private int maxLineLength;
private LongWritable key;
private Text value;
private boolean isCompressedInput;
private Decompressor decompressor;
private byte[] recordDelimiterBytes;
public LineRecordReader() {
}
public LineRecordReader(byte[] recordDelimiter) {
this.recordDelimiterBytes = recordDelimiter;
}
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException { //初始化信息
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
public boolean nextKeyValue() throws IOException { //将split分片数据生成<key,value>键值对
if (key == null) {
key = new LongWritable();
}
key.set(pos); //将key进行赋值,赋值为所在文件的位置
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark(); //生成一行数据的大小
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); //读取一行一行数据作为value的值
pos += newSize; //增加位置信息
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
}</span>
当LineRecordReader方法每一次读取一行时,便执行一次nextkeyvalue方法,当成功生成一个<key,value>键值对后,nextkeyvalue方法返回true值,这是新得到的key和value存放在LineRecordReader对象中的key和value属性中,就可以进行读取了。当nextkeyvalue()方法将所有的数据读取结束后,就表示一个split中的所有数据被读取到map中。
三 总结
通过上述三个类以及相应的方法,试下了将数据从Block中读取,并生成<key,value>键值对过程:
getSplit()—->splits—–>createRecordReader—–>nextKeyValue()—–><key,value>键值对
当我们实现自己定制的Split方法时,通常是重写上面三个类中相应的方法,从而实现新的功能