1 package MapReduce; 2 3 import java.io.IOException; 4 import java.net.URI; 5 import java.net.URISyntaxException; 6 import java.util.StringTokenizer; 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Counter; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 19 /** 20 * mapreduce中计数器的使用 21 * 22 */ 23 public class WordCountApp { 24 private static final String INPUT_PATH = "hdfs://h201:9000/user/hadoop/input"; 25 private static final String OUTPUT_PATH = "hdfs://h201:9000/user/hadoop/output"; 26 27 public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 28 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 29 final String line = value.toString(); 30 StringTokenizer tokenizer = new StringTokenizer(line);//StringTokenizer是字符串分隔解析类型,按空格截取交给takenizer这个容器 31 final Counter counter = context.getCounter("Sensitive", "hello");//计数器,前面是技术器名字,后面是给谁计数 32 if (value.toString().contains("hello")) { 33 counter.increment(1L); //当查询到包含hello的词语时,计数器加1 34 } 35 while(tokenizer.hasMoreTokens()) { 36 String target = tokenizer.nextToken();//分隔符前面的输出给target 37 if(target.equals("hello") || target.equals("jiejie")){ 38 context.write(new Text(target), new LongWritable(1)); 39 } 40 } 41 } 42 } 43 44 public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 45 @Override 46 protected void reduce(Text key, Iterable<LongWritable> value, 47 Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { 48 long times = 0l; 49 while (value.iterator().hasNext()) { 50 times += value.iterator().next().get();//迭代器累加给time 51 } 52 //if(times > 3 ){ //输出计数大于3的选项 53 context.write(key, new LongWritable(times)); 54 //} 55 } 56 57 } 58 public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { 59 Configuration conf = new Configuration(); 60 conf.set("mapred.jar","wcapp.jar");//申明jar名字为wcapp.jar 61 //我们可以在代码中进行设置来自定义 key/value 输出分隔符:在主函数中添加如下一行代码: 62 conf.set("mapred.textoutputformat.separator", ";"); //此处以”;“作为分割符 63 final FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);//读路径信息 64 fileSystem.delete(new Path(OUTPUT_PATH), true);//删除路径信息 输出路径不能存在 65 66 final Job job = new Job(conf, WordCountApp.class.getSimpleName()); 67 job.setJarByClass(WordCountApp.class);//启job任务 68 69 FileInputFormat.setInputPaths(job, INPUT_PATH);//输入 区别 引入位置变量new Path(args[0])直接换成路径,好处:执行过程中不用再给路径。坏处:不够灵活。 70 job.setMapperClass(MyMapper.class); 71 job.setMapOutputKeyClass(Text.class); 72 job.setMapOutputValueClass(LongWritable.class); 73 job.setCombinerClass(MyReducer.class); 74 job.setReducerClass(MyReducer.class); 75 job.setOutputKeyClass(Text.class); 76 job.setOutputValueClass(LongWritable.class); 77 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));//输出 78 System.exit(job.waitForCompletion(true) ? 0 : 1); 79 } 80 }
StringTokenizer是字符串分隔解析类型,属于:java.util包。
1.StringTokenizer的构造函数
StringTokenizer(String str):构造一个用来解析str的StringTokenizer对象。java默认的分隔符是“空格”、“制表符(‘\t’)”、“换行符(‘\n’)”、“回车符(‘\r’)”。
StringTokenizer(String str,String delim):构造一个用来解析str的StringTokenizer对象,并提供一个指定的分隔符。
StringTokenizer(String str,String delim,boolean returnDelims):构造一个用来解析str的StringTokenizer对象,并提供一个指定的分隔符,同时,指定是否返回分隔符。
2.StringTokenizer的一些常用方法
说明:
1.所有方法均为public;
2.书写格式:[修饰符] <返回类型><方法名([参数列表])>
int countTokens():返回nextToken方法被调用的次数。
boolean hasMoreTokens():返回是否还有分隔符。
boolean hasMoreElements():返回是否还有分隔符。
String nextToken():返回从当前位置到下一个分隔符的字符串。
Object nextElement():返回从当前位置到下一个分隔符的字符串。
String nextToken(String delim):与4类似,以指定的分隔符返回结果。
[hadoop@h201 counter]$ /usr/jdk1.7.0_25/bin/javac WordCountApp.java
Note: WordCountApp.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
[hadoop@h201 counter]$ /usr/jdk1.7.0_25/bin/jar cvf wcapp.jar WordCountApp*class
added manifest
adding: WordCountApp.class(in = 2358) (out= 1191)(deflated 49%)
adding: WordCountApp$MyMapper.class(in = 2019) (out= 885)(deflated 56%)
adding: WordCountApp$MyReducer.class(in = 1655) (out= 691)(deflated 58%)
[hadoop@h201 counter]$ hadoop jar wcapp.jar WordCountApp
18/03/11 23:11:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
18/03/11 23:11:10 INFO client.RMProxy: Connecting to ResourceManager at h201/192.168.121.132:8032
18/03/11 23:11:10 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/03/11 23:11:10 INFO input.FileInputFormat: Total input paths to process : 2
18/03/11 23:11:11 INFO mapreduce.JobSubmitter: number of splits:2
18/03/11 23:11:11 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
18/03/11 23:11:11 INFO Configuration.deprecation: mapred.textoutputformat.separator is deprecated. Instead, use mapreduce.output.textoutputformat.separator
18/03/11 23:11:11 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516635595760_0006
18/03/11 23:11:11 INFO impl.YarnClientImpl: Submitted application application_1516635595760_0006
18/03/11 23:11:11 INFO mapreduce.Job: The url to track the job: http://h201:8088/proxy/application_1516635595760_0006/
18/03/11 23:11:11 INFO mapreduce.Job: Running job: job_1516635595760_0006
18/03/11 23:11:20 INFO mapreduce.Job: Job job_1516635595760_0006 running in uber mode : false
18/03/11 23:11:20 INFO mapreduce.Job: map 0% reduce 0%
18/03/11 23:11:26 INFO mapreduce.Job: map 50% reduce 0%
18/03/11 23:11:37 INFO mapreduce.Job: map 100% reduce 0%
18/03/11 23:11:38 INFO mapreduce.Job: map 100% reduce 100%
18/03/11 23:11:38 INFO mapreduce.Job: Job job_1516635595760_0006 completed successfully
18/03/11 23:11:38 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=39
FILE: Number of bytes written=329603
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=914
HDFS: Number of bytes written=19
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=18964
Total time spent by all reduces in occupied slots (ms)=5647
Total time spent by all map tasks (ms)=18964
Total time spent by all reduce tasks (ms)=5647
Total vcore-seconds taken by all map tasks=18964
Total vcore-seconds taken by all reduce tasks=5647
Total megabyte-seconds taken by all map tasks=19419136
Total megabyte-seconds taken by all reduce tasks=5782528
Map-Reduce Framework
Map input records=54
Map output records=35
Map output bytes=507
Map output materialized bytes=45
Input split bytes=227
Combine input records=35
Combine output records=2
Reduce input groups=2
Reduce shuffle bytes=45
Reduce input records=2
Reduce output records=2
Spilled Records=4
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=584
CPU time spent (ms)=2380
Physical memory (bytes) snapshot=387678208
Virtual memory (bytes) snapshot=3221241856
Total committed heap usage (bytes)=257499136
Sensitive
hello=18
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=687
File Output Format Counters
Bytes Written=19