win7环境下hadoop cdh4编写mapreduce程序问题

最近在研究windows环境下编写mapreduce程序,遇到一些问题,但终于得以解决,现在总结一下。

编写mapreduce的步骤:

1.由于没用mapreduce的eclipse插件,我就直接把hadoop的相关包引入到项目中。

2.编写mapreduce程序,这个没什么说的。

3.不用引入core-site.xml,hdfs-site.xml,mapred-site.xml,反正我引入项目后,报集群配置错误,我就没用这三个文件。

在运行过程中碰到以下几个问题:

1.写hdfs权限问题,最直接的方法就是直接修改hdfs 目录的权限,在linux 环境中执行该命令: hadoop fs -chmod 777 /user/anny,问题解决。

2.忘记是什么问题了,要求安装了cygwin程序,并在环境变量path中增加cygwin的bin目录,比如我机器上是C:\cygwin\bin

3.报错“ java.io.IOException: Unable to rename \tmp\hadoop-Anny\mapred\local\localRunner\Anny\jobcache\job_local_0001\attempt_local_0001_m_000000_0\output\spill0.out”,我用的最笨的办法,是直接在程序中增加  conf.set(“hadoop.tmp.dir”, “\\tmp\\hadoop-root“); 这样问题就解决了。

备注:今天重新写了mr程序,结果运行时又报了上面的错误,仔细查了一下,发现在windows环境的myeclipse中执行hadoop mr程序时,系统默认使用的的windows当前登录用户名执行程序,在linux环境中,发现在执行mr时创建的输出 hdfs目录用户名,全是windows下的用户名,在linux环境中实际上不存在Anny的用户名。

drwxr-xr-x   – Anny supergroup          0 2013-12-11 18:41 /user/anny/hdfs/log_kpi/pv
drwxr-xr-x   – Anny supergroup          0 2013-12-11 18:44 /user/anny/hdfs/log_kpi/time

解决办法:

第一种(结果:mr程序能正常运行,但创建hdfs目录的用户名仍然是windows用户):

conf.set(“dfs.permissions”,”false”);
conf.set(“hadoop.tmp.dir”, “\\tmp\\hadoop-root\\mapred\\local\\localRunner\\Anny“);

第二种:

参考文档:http://stackoverflow.com/questions/11041253/set-hadoop-system-user-for-client-embedded-in-java-webapp

The code below works for me the same as

System.setProperty("HADOOP_USER_NAME", "hduser")

或者

UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hduser");

ugi.doAs(new PrivilegedExceptionAction<Void>() {

public Void run() throws Exception {

Configuration configuration = new Configuration();

configuration.set("hadoop.job.ugi", "hduser");

int res = ToolRunner.run(configuration, new YourTool(), args);

return null;

}

});

我采用的语句一,简单,只在在Run程序中加入 System.setProperty(“HADOOP_USER_NAME”, “root”);

linux环境下hdfs目录创建的用户名就变成了”root”,程序也能正确执行。

drwxr-xr-x   – root supergroup          0 2013-12-12 10:51 /user/anny/hdfs/log_kpi/browser

 

编译成jar文件时,需要把右边的.classpath和.project勾上.

 

下面给个倒排索引的程序:

  1 package test_mapreduce.mr;
  2 
  3 import java.io.IOException;
  4 import java.util.StringTokenizer;
  5 
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.conf.Configured;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Reducer;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.util.GenericOptionsParser;
 17 import org.apache.hadoop.util.Tool;
 18 import org.apache.hadoop.util.ToolRunner;
 19 
 20 public class InvertedIndex extends Configured implements Tool {
 21     
 22     public static class InvertedIndexMapper extends Mapper<Object,Text,Text,Text>{
 23         private Text keyInfo = new Text(); //存储单词和URI的组合
 24         private Text valueInfo = new Text(); //存储词频
 25         private FileSplit split; //存储Split对象
 26         
 27         public void map(Object key,Text value,Context context) 
 28                 throws IOException,InterruptedException{
 29             //获得<key,value>对所属的FileSplit对象
 30             split = (FileSplit)context.getInputSplit();
 31             StringTokenizer itr = new StringTokenizer(value.toString());
 32             while(itr.hasMoreTokens()){
 33                 //key值由单词和URI组成,如"MapReduce:1.txt"
 34                 keyInfo.set(itr.nextToken() + ":" + split.getPath().toString());
 35                 //词频初始为1
 36                 valueInfo.set("1");
 37                 context.write(keyInfo,valueInfo);
 38             }
 39         }        
 40     }
 41     
 42     public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{
 43         private Text info = new Text();
 44         
 45         @Override
 46         public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
 47             //统计词频
 48             int sum = 0;
 49             for(Text value:values){
 50                 sum += Integer.parseInt(value.toString());
 51             }
 52             
 53             int splitIndex = key.toString().indexOf(":");
 54             //重新设置value值由URI和词频组成
 55             info.set(key.toString().substring(splitIndex+1) + ":" + sum);
 56             //重新设置key值为单词
 57             key.set(key.toString().substring(0,splitIndex));
 58             context.write(key, info);            
 59         }
 60     }
 61     
 62     public static class InvertedIndexReducer extends Reducer<Text,Text,Text,Text>{
 63         private Text result = new Text();
 64         
 65         @Override
 66         public void reduce(Text key,Iterable<Text> values,Context context)
 67             throws IOException,InterruptedException{
 68             //生成文档列表
 69             String fileList = new String();
 70             for(Text value : values){
 71                 fileList += value.toString() + ";";
 72             }
 73             result.set(fileList);
 74             context.write(key, result);
 75             
 76         }
 77     }
 78     
 79     @Override
 80     public int run(String[] args) throws Exception{
 81         Configuration conf = new Configuration();
 82         conf.set("dfs.permissions","false");
 83         //conf.set("hadoop.tmp.dir", "\\tmp\\hadoop-root");
        System.setProperty("HADOOP_USER_NAME", "root");
84 85 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); 86 if(otherArgs.length != 2){ 87 System.out.println("Usage: invertedindex <in> <out>"); 88 System.exit(2); 89 } 90 91 Job job = new Job(conf,"InvertedIndex"); 92 job.setJarByClass(InvertedIndex.class); 93 94 job.setMapperClass(InvertedIndexMapper.class); 95 job.setMapOutputKeyClass(Text.class); 96 job.setMapOutputValueClass(Text.class); 97 98 job.setCombinerClass(InvertedIndexCombiner.class); 99 job.setReducerClass(InvertedIndexReducer.class); 100 job.setOutputKeyClass(Text.class); 101 job.setOutputValueClass(Text.class); 102 103 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 104 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 105 106 //System.exit(job.waitForCompletion(true)?0:1); 107 boolean success = job.waitForCompletion(true); 108 return success ? 0 : 1; 109 } 110 111 public static void main(String[] args) throws Exception{ 112 int r = ToolRunner.run(new InvertedIndex(), args); 113 System.exit(r); 114 } 115 116 }

 

 

 

    原文作者:MapReduce
    原文地址: https://www.cnblogs.com/anny-1980/articles/3441798.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞