需求
输入文件:文本文件
每行格式:
<source> <destination> <time>
- 3个部分由空格隔开
- 其中source和destination为两个字符串,内部没有空格
- time为一个浮点数,代表时间(秒为单位)
- 涵义:可以表示一次电话通话,或表示一次网站访问等
输入可能有噪音:
如果一行不符合上述格式,应该被丢弃,程序需要正确执行
MapReduce计算:统计每对source‐destination的信息
输出
<source> <destination> <count> <average time>
- 每一个source‐destination组合输出一行(注意:顺序相反按不同处理)
- 每行输出通话次数和通话平均时间(保留3位小数,例如2.300)
编程实现
import java.io.IOException;
import java.util.StringTokenizer;
import java.math.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Hw2Part1{
public static class TokenizerMapper extends Mapper<Object, Text, Text, DoubleWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line= value.toString();//all text
String[] lines=line.split("\n");//split to line
for(String eachline:lines){
String[] words=eachline.split("\\s+");
double time=Double.parseDouble(words[2]);
if(words.length==3&&time>=0){
context.write(new Text(words[0]+"\t"+words[1]+"\t"),new DoubleWritable(time));
}
}
}
}
public static class IntSumCombiner extends Reducer<Text,DoubleWritable,Text,DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context
) throws IOException, InterruptedException {
double sum = 0;
int count=0;
for (DoubleWritable val : values) {
sum += val.get();
count=count+1;
}
Text newkey = new Text(key+"\t"+Integer.toString(count)+"\t");
sum=Math.round(sum/count*1000)/1000.000;
result.set(sum);
context.write(newk, result);
}
}
public static class IntSumReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable> {
private Text result_key= new Text();
private Text result_value= new Text();
private byte[] prefix;
private byte[] suffix;
protected void setup(Context context) {
try {
prefix= Text.encode("count of ").array();
suffix= Text.encode(" =").array();
} catch (Exception e) {
prefix = suffix = new byte[0];
}
}
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context
) throws IOException, InterruptedException {
double sum = 0;
for (DoubleWritable val : values) {
sum += val.get();
}
// generate result key
result_key.set(prefix);
result_key.append(key.getBytes(), 0, key.getLength());
result_key.append(suffix, 0, suffix.length);
context.write(key,new DoubleWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Hw2Part1");
job.setJarByClass(Hw2Part1.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumCombiner.class);
job.setReducerClass(IntSumReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// add the input paths as given by command line
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
// add the output path as given by the command line
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
编译命令:
javac Hw2Part1.java
打包命令:
jar cfm Hw2Part1.jar Hw2Part1-manifest.txt Hw2Part1*.class
删除hdfs上的output文件:
hdfs dfs -rm -f -r /hw2/output
运行:
hadoop jar ./Hw2Part1.jar /hw2/example-input.txt /hw2/output