MapReduce Counter为提供我们一个窗口:观察MapReduce job运行期的各种细节数据。MapReduce自带了许多默认Counter。
Counter有”组group”的概念,用于表示逻辑上相同范围的所有数值。MapReduce job提供的默认Counter分为三个组
- Map-Reduce Frameword
Map input records,Map skipped records,Map input bytes,Map output records,Map output bytes,Combine input records,Combine output records,Reduce input records,Reduce input groups,Reduce output records,Reduce skipped groups,Reduce skipped records,Spilled records - File Systems
FileSystem bytes read,FileSystem bytes written - Job Counters
Launched map tasks,Launched reduce tasks,Failed map tasks,Failed reduce tasks,Data-local map tasks,Rack-local map tasks,Other local map tasks
这些
counters你在Web UI中,或是job结束后在控制台生成的统计报告中都看得到。
见如下MR运行日志:
-bash-4.1$ hadoop jar mr.jar com.catt.cdh.mr.CountRecords
13/11/29 11:38:04 WARN conf.Configuration: fs.default.name is deprecated. Instead, use fs.defaultFS
13/11/29 11:38:10 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/11/29 11:38:11 INFO input.FileInputFormat: Total input paths to process : 1
13/11/29 11:38:11 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
13/11/29 11:38:11 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 6298911ef75545c61859c08add6a74a83e0183ad]
13/11/29 11:38:12 INFO mapred.JobClient: Running job: job_201311251130_0208
13/11/29 11:38:13 INFO mapred.JobClient: map 0% reduce 0%
13/11/29 11:38:40 INFO mapred.JobClient: map 100% reduce 0%
13/11/29 11:38:49 INFO mapred.JobClient: map 100% reduce 100%
13/11/29 11:38:57 INFO mapred.JobClient: Job complete: job_201311251130_0208
13/11/29 11:38:57 INFO mapred.JobClient: Counters: 32
13/11/29 11:38:57 INFO mapred.JobClient: File System Counters
13/11/29 11:38:57 INFO mapred.JobClient: FILE: Number of bytes read=36
13/11/29 11:38:57 INFO mapred.JobClient: FILE: Number of bytes written=322478
13/11/29 11:38:57 INFO mapred.JobClient: FILE: Number of read operations=0
13/11/29 11:38:57 INFO mapred.JobClient: FILE: Number of large read operations=0
13/11/29 11:38:57 INFO mapred.JobClient: FILE: Number of write operations=0
13/11/29 11:38:57 INFO mapred.JobClient: HDFS: Number of bytes read=139
13/11/29 11:38:57 INFO mapred.JobClient: HDFS: Number of bytes written=7
13/11/29 11:38:57 INFO mapred.JobClient: HDFS: Number of read operations=2
13/11/29 11:38:57 INFO mapred.JobClient: HDFS: Number of large read operations=0
13/11/29 11:38:57 INFO mapred.JobClient: HDFS: Number of write operations=1
13/11/29 11:38:57 INFO mapred.JobClient: Job Counters
13/11/29 11:38:57 INFO mapred.JobClient: Launched map tasks=1
13/11/29 11:38:57 INFO mapred.JobClient: Launched reduce tasks=1
13/11/29 11:38:57 INFO mapred.JobClient: Data-local map tasks=1
13/11/29 11:38:57 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=31068
13/11/29 11:38:57 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=6671
13/11/29 11:38:57 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/11/29 11:38:57 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/11/29 11:38:57 INFO mapred.JobClient: Map-Reduce Framework
13/11/29 11:38:57 INFO mapred.JobClient: Map input records=13
13/11/29 11:38:57 INFO mapred.JobClient: Map output records=1
13/11/29 11:38:57 INFO mapred.JobClient: Map output bytes=14
13/11/29 11:38:57 INFO mapred.JobClient: Input split bytes=103
13/11/29 11:38:57 INFO mapred.JobClient: Combine input records=0
13/11/29 11:38:57 INFO mapred.JobClient: Combine output records=0
13/11/29 11:38:57 INFO mapred.JobClient: Reduce input groups=1
13/11/29 11:38:57 INFO mapred.JobClient: Reduce shuffle bytes=32
13/11/29 11:38:57 INFO mapred.JobClient: Reduce input records=1
13/11/29 11:38:57 INFO mapred.JobClient: Reduce output records=1
13/11/29 11:38:57 INFO mapred.JobClient: Spilled Records=2
13/11/29 11:38:57 INFO mapred.JobClient: CPU time spent (ms)=4780
13/11/29 11:38:57 INFO mapred.JobClient: Physical memory (bytes) snapshot=657629184
13/11/29 11:38:57 INFO mapred.JobClient: Virtual memory (bytes) snapshot=3802001408
13/11/29 11:38:57 INFO mapred.JobClient: Total committed heap usage (bytes)=1915486208
13/11/29 11:38:57 INFO mr.CountRecords: sum 13
使用Java Enum自定义Counter
一个Counter可以是任意的Enum类,见如下代码示例:
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/* * 使用Java Enum自定义Counter * 一个Counter可以是任意的Enum类型。 * 比如有个文件每行记录了用户的每次上网时长,统计上网时间超过30分钟的次数,小于或等于30分钟的次数 * 可以使用下面的代码。最后的计数结果会显示在终端上 */
public class CounterTest extends Configured implements Tool {
private final static Log log = LogFactory.getLog(CounterTest.class);
public static void main(String[] args) throws Exception {
String[] ars = new String[] { "hdfs://data2.kt:8020/test/input",
"hdfs://data2.kt:8020/test/output" };
int exitcode = ToolRunner.run(new CounterTest(), ars);
System.exit(exitcode);
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("fs.default.name", "hdfs://data2.kt:8020/");
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(args[1]), true);
Job job = new Job();
job.setJarByClass(CounterTest.class);
job.setMapperClass(MyMap.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
int result = job.waitForCompletion(true) ? 0 : 1;
//针对Counter结果的显示
Counters counters = job.getCounters();
Counter counter1=counters.findCounter(NetTimeLong.OVER30M);
log.info(counter1.getValue());
log.info(counter1.getDisplayName()+","+counter1.getName());
return result;
}
public static class MyMap extends Mapper<LongWritable, Text, NullWritable, Text> {
private Counter counter1, counter2;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
double temperature = Double.parseDouble(value.toString());
if (temperature <= 30) {
// get时如果不存在就会自动添加
counter2 = context.getCounter(NetTimeLong.LOW30M);
counter2.increment(1);
} else if (temperature > 30) {
counter1 = context.getCounter(NetTimeLong.OVER30M);
counter1.increment(1);
}
context.write(NullWritable.get(), value);
}
}
}
enum NetTimeLong {
OVER30M, LOW30M
}