最近在研究windows环境下编写mapreduce程序,遇到一些问题,但终于得以解决,现在总结一下。
编写mapreduce的步骤:
1.由于没用mapreduce的eclipse插件,我就直接把hadoop的相关包引入到项目中。
2.编写mapreduce程序,这个没什么说的。
3.不用引入core-site.xml,hdfs-site.xml,mapred-site.xml,反正我引入项目后,报集群配置错误,我就没用这三个文件。
在运行过程中碰到以下几个问题:
1.写hdfs权限问题,最直接的方法就是直接修改hdfs 目录的权限,在linux 环境中执行该命令: hadoop fs -chmod 777 /user/anny,问题解决。
2.忘记是什么问题了,要求安装了cygwin程序,并在环境变量path中增加cygwin的bin目录,比如我机器上是C:\cygwin\bin
3.报错“ java.io.IOException: Unable to rename \tmp\hadoop-Anny\mapred\local\localRunner\Anny\jobcache\job_local_0001\attempt_local_0001_m_000000_0\output\spill0.out”,我用的最笨的办法,是直接在程序中增加 conf.set(“hadoop.tmp.dir”, “\\tmp\\hadoop-root“); 这样问题就解决了。
备注:今天重新写了mr程序,结果运行时又报了上面的错误,仔细查了一下,发现在windows环境的myeclipse中执行hadoop mr程序时,系统默认使用的的windows当前登录用户名执行程序,在linux环境中,发现在执行mr时创建的输出 hdfs目录用户名,全是windows下的用户名,在linux环境中实际上不存在Anny的用户名。
drwxr-xr-x – Anny supergroup 0 2013-12-11 18:41 /user/anny/hdfs/log_kpi/pv
drwxr-xr-x – Anny supergroup 0 2013-12-11 18:44 /user/anny/hdfs/log_kpi/time
解决办法:
第一种(结果:mr程序能正常运行,但创建hdfs目录的用户名仍然是windows用户):
conf.set(“dfs.permissions”,”false”);
conf.set(“hadoop.tmp.dir”, “\\tmp\\hadoop-root\\mapred\\local\\localRunner\\Anny“);
第二种:
The code below works for me the same as
System.setProperty("HADOOP_USER_NAME", "hduser")
或者
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hduser");
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
Configuration configuration = new Configuration();
configuration.set("hadoop.job.ugi", "hduser");
int res = ToolRunner.run(configuration, new YourTool(), args);
return null;
}
});
我采用的语句一,简单,只在在Run程序中加入 System.setProperty(“HADOOP_USER_NAME”, “root”);
linux环境下hdfs目录创建的用户名就变成了”root”,程序也能正确执行。
drwxr-xr-x – root supergroup 0 2013-12-12 10:51 /user/anny/hdfs/log_kpi/browser
编译成jar文件时,需要把右边的.classpath和.project勾上.
下面给个倒排索引的程序:
1 package test_mapreduce.mr; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.conf.Configured; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.util.GenericOptionsParser; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19 20 public class InvertedIndex extends Configured implements Tool { 21 22 public static class InvertedIndexMapper extends Mapper<Object,Text,Text,Text>{ 23 private Text keyInfo = new Text(); //存储单词和URI的组合 24 private Text valueInfo = new Text(); //存储词频 25 private FileSplit split; //存储Split对象 26 27 public void map(Object key,Text value,Context context) 28 throws IOException,InterruptedException{ 29 //获得<key,value>对所属的FileSplit对象 30 split = (FileSplit)context.getInputSplit(); 31 StringTokenizer itr = new StringTokenizer(value.toString()); 32 while(itr.hasMoreTokens()){ 33 //key值由单词和URI组成,如"MapReduce:1.txt" 34 keyInfo.set(itr.nextToken() + ":" + split.getPath().toString()); 35 //词频初始为1 36 valueInfo.set("1"); 37 context.write(keyInfo,valueInfo); 38 } 39 } 40 } 41 42 public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{ 43 private Text info = new Text(); 44 45 @Override 46 public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{ 47 //统计词频 48 int sum = 0; 49 for(Text value:values){ 50 sum += Integer.parseInt(value.toString()); 51 } 52 53 int splitIndex = key.toString().indexOf(":"); 54 //重新设置value值由URI和词频组成 55 info.set(key.toString().substring(splitIndex+1) + ":" + sum); 56 //重新设置key值为单词 57 key.set(key.toString().substring(0,splitIndex)); 58 context.write(key, info); 59 } 60 } 61 62 public static class InvertedIndexReducer extends Reducer<Text,Text,Text,Text>{ 63 private Text result = new Text(); 64 65 @Override 66 public void reduce(Text key,Iterable<Text> values,Context context) 67 throws IOException,InterruptedException{ 68 //生成文档列表 69 String fileList = new String(); 70 for(Text value : values){ 71 fileList += value.toString() + ";"; 72 } 73 result.set(fileList); 74 context.write(key, result); 75 76 } 77 } 78 79 @Override 80 public int run(String[] args) throws Exception{ 81 Configuration conf = new Configuration(); 82 conf.set("dfs.permissions","false"); 83 //conf.set("hadoop.tmp.dir", "\\tmp\\hadoop-root"); System.setProperty("HADOOP_USER_NAME", "root");
84 85 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); 86 if(otherArgs.length != 2){ 87 System.out.println("Usage: invertedindex <in> <out>"); 88 System.exit(2); 89 } 90 91 Job job = new Job(conf,"InvertedIndex"); 92 job.setJarByClass(InvertedIndex.class); 93 94 job.setMapperClass(InvertedIndexMapper.class); 95 job.setMapOutputKeyClass(Text.class); 96 job.setMapOutputValueClass(Text.class); 97 98 job.setCombinerClass(InvertedIndexCombiner.class); 99 job.setReducerClass(InvertedIndexReducer.class); 100 job.setOutputKeyClass(Text.class); 101 job.setOutputValueClass(Text.class); 102 103 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 104 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 105 106 //System.exit(job.waitForCompletion(true)?0:1); 107 boolean success = job.waitForCompletion(true); 108 return success ? 0 : 1; 109 } 110 111 public static void main(String[] args) throws Exception{ 112 int r = ToolRunner.run(new InvertedIndex(), args); 113 System.exit(r); 114 } 115 116 }