1.编写两个文本:
XX.txt与YY.txt,两个文件的20160806重复
XX.txt:
20160708
20161113
20160702
20160906
20161011
20160901
20160108
20160609
20160221
20160308
20161001
20161012
20160309
20161023
20161104
20160806
YY.txt:
20160504
20160806
20160516
20160422
20160604
20161122
20161115
20161112
20160311
20161024
20160918
20161102
20160512
20160412
20161012
20160615
20160919
20160101
20160102
20160103
20160104
20160105
20160106
20160107
20160108
20160109
20160110
2.编写task3.java
//Mapper
public static class Map extends Mapper<LongWritable, Text, Text, NullWritable> {
private static Text text = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
text = value;
context.write(value, NullWritable.get());
}
}
//Reducer
public static class Reduce extends Reducer<Text,NullWritable, Text,NullWritable> {
public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
//Driver
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf ,”去重”);
job.setJarByClass(task3.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
3.编译task3.java生成task3.jar包文件
4.将XX与YY两个文件和task3.jar包文件上传到Hadoop的/opt/目录下
5.将hadoop的/opt/目录下的XX与YY两个文件上传至HDFS中
上传至HDFS的命令:
hdfs dfs -put /opt/XX.txt /user1/root/
hdfs dfs -put /opt/YY.txt /user1/root/
6.以hadoop jar 提交任务给集群
提交命令:
hadoop jar /opt/task3.jar /user1/root /user2/root/ZZ.txt