mapreduce中counter的使用

   MapReduce Counter为提供我们一个窗口:观察MapReduce job运行期的各种细节数据。MapReduce自带了许多默认Counter。

    Counter有”组group”的概念,用于表示逻辑上相同范围的所有数值。MapReduce job提供的默认Counter分为三个组

  1. Map-Reduce Frameword
    Map input records,Map skipped records,Map input bytes,Map output records,Map output bytes,Combine input records,Combine output records,Reduce input records,Reduce input groups,Reduce output records,Reduce skipped groups,Reduce skipped records,Spilled records
  2. File Systems
    FileSystem bytes read,FileSystem bytes written
  3. Job Counters
    Launched map tasks,Launched reduce tasks,Failed map tasks,Failed reduce tasks,Data-local map tasks,Rack-local map tasks,Other local map tasks

        这些 
counters你在Web UI中,或是job结束后在控制台生成的统计报告中都看得到。
 
见如下MR运行日志:




-bash-4.1$ hadoop jar mr.jar com.catt.cdh.mr.CountRecords
13/11/29 11:38:04 WARN conf.Configuration: fs.default.name is deprecated. Instead, use fs.defaultFS
13/11/29 11:38:10 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/11/29 11:38:11 INFO input.FileInputFormat: Total input paths to process : 1
13/11/29 11:38:11 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
13/11/29 11:38:11 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 6298911ef75545c61859c08add6a74a83e0183ad]
13/11/29 11:38:12 INFO mapred.JobClient: Running job: job_201311251130_0208
13/11/29 11:38:13 INFO mapred.JobClient:  map 0% reduce 0%
13/11/29 11:38:40 INFO mapred.JobClient:  map 100% reduce 0%
13/11/29 11:38:49 INFO mapred.JobClient:  map 100% reduce 100%
13/11/29 11:38:57 INFO mapred.JobClient: Job complete: job_201311251130_0208
13/11/29 11:38:57 INFO mapred.JobClient: Counters: 32
13/11/29 11:38:57 INFO mapred.JobClient:   File System Counters
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of bytes read=36
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of bytes written=322478
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of read operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of large read operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of write operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of bytes read=139
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of bytes written=7
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of read operations=2
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of large read operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of write operations=1
13/11/29 11:38:57 INFO mapred.JobClient:   Job Counters 
13/11/29 11:38:57 INFO mapred.JobClient:     Launched map tasks=1
13/11/29 11:38:57 INFO mapred.JobClient:     Launched reduce tasks=1
13/11/29 11:38:57 INFO mapred.JobClient:     Data-local map tasks=1
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all maps in occupied slots (ms)=31068
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all reduces in occupied slots (ms)=6671
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/11/29 11:38:57 INFO mapred.JobClient:   Map-Reduce Framework
13/11/29 11:38:57 INFO mapred.JobClient:     Map input records=13
13/11/29 11:38:57 INFO mapred.JobClient:     Map output records=1
13/11/29 11:38:57 INFO mapred.JobClient:     Map output bytes=14
13/11/29 11:38:57 INFO mapred.JobClient:     Input split bytes=103
13/11/29 11:38:57 INFO mapred.JobClient:     Combine input records=0
13/11/29 11:38:57 INFO mapred.JobClient:     Combine output records=0
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce input groups=1
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce shuffle bytes=32
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce input records=1
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce output records=1
13/11/29 11:38:57 INFO mapred.JobClient:     Spilled Records=2
13/11/29 11:38:57 INFO mapred.JobClient:     CPU time spent (ms)=4780
13/11/29 11:38:57 INFO mapred.JobClient:     Physical memory (bytes) snapshot=657629184
13/11/29 11:38:57 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3802001408
13/11/29 11:38:57 INFO mapred.JobClient:     Total committed heap usage (bytes)=1915486208
13/11/29 11:38:57 INFO mr.CountRecords: sum     13


使用Java Enum自定义Counter

一个Counter可以是任意的Enum类,见如下代码示例:

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/* * 使用Java Enum自定义Counter * 一个Counter可以是任意的Enum类型。 * 比如有个文件每行记录了用户的每次上网时长,统计上网时间超过30分钟的次数,小于或等于30分钟的次数 * 可以使用下面的代码。最后的计数结果会显示在终端上 */
public class CounterTest extends Configured implements Tool {
	private final static Log log = LogFactory.getLog(CounterTest.class);

	public static void main(String[] args) throws Exception {
		String[] ars = new String[] { "hdfs://data2.kt:8020/test/input",
				"hdfs://data2.kt:8020/test/output" };
		int exitcode = ToolRunner.run(new CounterTest(), ars);
		System.exit(exitcode);
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		conf.set("fs.default.name", "hdfs://data2.kt:8020/");
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(args[1]), true);

		Job job = new Job();
		job.setJarByClass(CounterTest.class);

		job.setMapperClass(MyMap.class);
		job.setNumReduceTasks(0);

		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		int result = job.waitForCompletion(true) ? 0 : 1;
		
		//针对Counter结果的显示
		Counters counters = job.getCounters();
		Counter counter1=counters.findCounter(NetTimeLong.OVER30M);
		log.info(counter1.getValue());
		log.info(counter1.getDisplayName()+","+counter1.getName());
		
		return result;
	}

	public static class MyMap extends Mapper<LongWritable, Text, NullWritable, Text> {
		private Counter counter1, counter2;

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			double temperature = Double.parseDouble(value.toString());
			if (temperature <= 30) {
				// get时如果不存在就会自动添加
				counter2 = context.getCounter(NetTimeLong.LOW30M);
				counter2.increment(1);
			} else if (temperature > 30) {
				counter1 = context.getCounter(NetTimeLong.OVER30M);
				counter1.increment(1);
			}
			context.write(NullWritable.get(), value);
		}
	}
}

enum NetTimeLong {
	OVER30M, LOW30M
}

    原文作者:MapReduce
    原文地址: https://yq.aliyun.com/articles/43438
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞