HDFS中的Java API的使用

上传文件

PutFile.java

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class PutFile {

    public static void main(String[] args) throws IOException,URISyntaxException {
        Configuration conf = new Configuration();
        URI uri = new URI("hdfs://192.168.56.31:9000");
        FileSystem fs = FileSystem.get(uri,conf);
        //本地文件
        Path src = new Path("D:\\scala\\文档\\63\\access.txt");
        //HDFS存放位置
        Path dst = new Path("/");
        fs.copyFromLocalFile(src, dst);
        System.out.println("Upload to " + conf.get("fs.defaultFS"));
        // 以下相当于执行hdfs dfs -ls /
        FileStatus files[] = fs.listStatus(dst);
        
        for (FileStatus file:files) {
            System.out.println(file.getPath());
        }
        
        

    }

}

创建文件

CreateFile.java


import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CreateFile {

    public static void main(String[] args) throws Exception {
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
        // 定义新文件
        Path dfs = new Path("/hdfsfile");
        // 创建新文件,如果有则覆盖(true)
        FSDataOutputStream create = fs.create(dfs,true);
        
        create.writeBytes("Hello,HDFS !");

    }

}

查看文件详细信息

FileLocation.java

import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileLocation {

    public static void main(String[] args) throws Exception {
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
        Path fpath = new Path("/access.txt");
        FileStatus filestatus = fs.getFileStatus(fpath);
        /*
         * 获取文件在HDFS集群位置:
         * FileSystem.getFileBlockLocation(FileStatus file,long start, long len)"
         * 可查找指定文件在HDFS集群上的位置,其中file为文件的完整路径,start和len来标识查找文件的路径
         */
        BlockLocation[]blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
        filestatus.getAccessTime();
            for(int i=0;i<blkLocations.length;i++) {
                String[] hosts = blkLocations[i].getHosts();
                System.out.println("block_"+i+"_location:"+hosts[0]);
            }
        // 格式化日期输出
        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 获取文件访问时间,返回long
        long accessTime = filestatus.getAccessTime();
        System.out.println("access:"+formatter.format(new Date(accessTime)));
        // 获取文件修改时间,返回long
        long modificationTime = filestatus.getModificationTime();
        System.out.println("modification:"+formatter.format(new Date(modificationTime)));
        // 获取块大小,单位B
        long blockSize = filestatus.getBlockSize();
        System.out.println("blockSize:"+blockSize);
        // 获取文件大小,单位B
        long len = filestatus.getLen();
        System.out.println("length:"+len);
        // 获取文件所在用户组
        String group = filestatus.getGroup();
        System.out.println("group:"+group);
        // 获取文件拥有者
        String owner = filestatus.getOwner();
        System.out.println("owner:"+owner);
        // 获取文件拷贝数
        short replication = filestatus.getReplication();
        System.out.println("replication:"+replication);
    }

}

下载文件

GetFile.java

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class GetFile {

    public static void main(String[] args) throws Exception {
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
        //hdfs上文件
        Path src = new Path("/access.txt");
        // 下载到本地的文件名
        Path dst = new Path("D:\\scala\\文档\\63\\newfile.txt");
        fs.copyToLocalFile(src, dst);

    }

}

RPC通信

反射机制

Student.java

interface people{
    public void study();
}
public class Student implements people {
    private String name; //名字;
    private int age;
    //构造方法1;
    public Student() {}
    // 构造方法2;
    public Student(String name,int age) {
        this.name = name;
        this.age = age;
    }
    //set和get方法;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    
    public void study() {
        System.out.println("正在学习");
    }
    // 程序的主方法;
    public static void main(String[] args) {
    // 
    Class<? extends Student> tmp=Student.class;
    String cName = tmp.getName();
    System.out.println("类的名字是"+cName);
    try {
    // 动态加载指定类名
        Class c = Class.forName(cName);
        //得到类中的方法;
        java.lang.reflect.Method[] ms = c.getMethods();
        for(java.lang.reflect.Method m:ms) {
            System.out.println("方法的名字是"+m.getName());
            System.out.println("方法的返回值类型是"+m.getReturnType().toString());
            System.out.println("方法的参数类型是"+m.getParameterTypes());
        }
        //得到属性
        java.lang.reflect.Field[] fields = c.getFields();
        for(java.lang.reflect.Field f:fields) {
            System.out.println("参数类型是"+f.getType());
        }
        // 得到父接口
        Class[] is = c.getInterfaces();
        for(Class s:is) {
            System.out.println("父接口的名字是"+s.getName());
        }
        // 判断是否是数组
        System.out.println("数组:"+c.isArray());
        String CLName = c.getClassLoader().getClass().getName();
        System.out.println("类加载器:"+CLName);
        // 实例化构造器
        java.lang.reflect.Constructor cons = c.getConstructor(String.class,int.class);
        Student stu = (Student) cons.newInstance("hadoop",23);
        System.out.println(stu.getName()+":"+stu.getAge());
    }catch (Exception e) {
        e.printStackTrace();
    }

    }
}

MapReduce实现技术

WordMapper.java

package wordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// 创建一个WordMapper类继承于Mapper抽象类
public class WordMapper extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    //Mapper抽象类的核心方法,三个参数
    public void map( Object key,  //首字符偏移量
                    Text value,   //文件的一行内容
                    Context context)  //Mapper端的上下文,与outputCollector和 Reporter的功能类似
                throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }

}

WordReduce.java

package wordcount;



import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

// 创建一个WordReducer类继承于Reducer抽象类
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    private IntWritable result = new IntWritable();   //记录词频
    //Reducer 抽象类的核心方法,3个参数
    public void reduce( Text key,   //Map端输出的key值
            Iterable<IntWritable> values,  // Map端输出的Value集合
            Context context)  
            throws IOException,InterruptedException {
        int sum = 0;
        for (IntWritable val : values)  //遍历values集合,并把值相加
        {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
    
}

WordMain.java

package wordcount;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordMain {

    public static void main(String[] args) throws Exception {
        //Configuration类:读取Hadoop的配置文件,如core-site.xml...;
        // 也可用set方法重新设置(会覆盖):conf.set("fs.default.name",//"hdfs://xxxx:9000")
        Configuration conf = new Configuration();
        
        // 将命令行中参数自动设置到变量conf中
        String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        //      conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
//      conf.set("hadoop.job.user", "root");
//      conf.set("mapreduce.framework.name", "yarn");
//      conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
//      conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
//      conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
//      conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
//      conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
//      conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");

        if(otherArgs.length != 2)
        {
            System.err.println("Usage: wordcount <in><out>");
            System.exit(2);
        }
        
        Job job = new Job(conf, "word count");  // 新建一个job,传入配置信息
        job.setJarByClass(WordMain.class);  //设置主类
        job.setMapperClass(WordMapper.class);  //设置Mapper类
        job.setCombinerClass(WordReducer.class);  //设置作业合成类
        job.setReducerClass(WordReducer.class); //设置Reducer类
        job.setOutputKeyClass(Text.class);  //设置输出数据的关键类
        job.setOutputValueClass(IntWritable.class);    //设置输出值类
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  //文件输入
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  // 文件输出
        System.exit(job.waitForCompletion(true) ? 0 : 1);  // 等待完成退出
    }

}

打包上传

hdfs dfs -mkdir /user/hadoop
hdfs dfs -mkdir /user/hadoop/input
hdfs dfs -put file* /user/hadoop/input
hdfs dfs -ls /user/hadoop/input
hadoop jar wordcount.jar wordcount.WordMain /user/hadoop/input/file* /user/hadoop/output
hdfs dfs -ls /user/hadoop/output
hdfs dfs -text /user/hadoop/output/part-r-00000
hdfs://192.168.56.31:9000/user/hadoop/input
hdfs://192.168.56.31:9000/user/hadoop/output2

《HDFS中的Java API的使用》 image.png

WordCount2.java

package wordcount2;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount2 {
    
    public static class TokenizerMapper extends Mapper<Object, Text, Text,IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void Map(Object key,Text value, Context context) throws IOException,InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens())
            {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    
    public static class IntSumReducer extends Reducer<Text,IntWritable,Text, IntWritable>
    {
        private IntWritable result = new IntWritable();
        public void reduce(Text key,Iterable<IntWritable>value, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val: value)
            {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
        conf.set("hadoop.job.user", "root");
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
        conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
        conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
        conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
        conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
        conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");
        
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2)
        {
            System.err.println("Usage:wordcount <in><out>");
            System.exit(2);
        }
        
        Job job = new Job(conf, "word count2");  // 新建一个job,传入配置信息
        job.setJarByClass(WordCount2.class);  //设置主类
        job.setMapperClass(TokenizerMapper.class);  //设置Mapper类
        job.setCombinerClass(IntSumReducer.class);  //设置作业合成类
        job.setReducerClass(IntSumReducer.class); //设置Reducer类
        job.setOutputKeyClass(Text.class);  //设置输出数据的关键类
        job.setOutputValueClass(IntWritable.class);    //设置输出值类
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  //文件输入
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  // 文件输出
        boolean flag = job.waitForCompletion(true);
        System.out.println("SUCCEED !"+flag);  //任务完成提示
        System.exit(flag ? 0 : 1);
        System.out.println();
    }

}

    原文作者:lehuai
    原文地址: https://www.jianshu.com/p/21d631c8b796
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞