hadoop-java客户端搭建&WordCount

java客户端&开发环境搭建

win7下开发环境配置

1 先官网下hadop,然后配置HADOOP_HOME.

2 用csdn下的包替换HADOOP_HOME里的bin目录
此文件已经存于网盘
要注意版本对应.
这个包是操作系统依赖的文件. 在linux 下没毛病,win7很蛋疼.

hdfs上传文件

其他的都差不多,照着写就行.

package com.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * Created by v_zhangbing on 2017/7/4.
 */
public class HdfsClientTest {

    private Configuration conf;
    private FileSystem fs;

    @Before
    public void init() throws URISyntaxException, IOException, InterruptedException {
        conf = new Configuration();
        //conf.set("fs.defaultFS","hdfs://ubuntu:9000");

        fs = FileSystem.get(new URI("hdfs://ubuntu:9000"), conf, "zb");
    }

    @Test
    public void testUploadFile() throws IOException {
        fs.copyFromLocalFile(new Path("C:/Users/v_zhangbing/Downloads/aaa"), new Path("/java/aaa"));
        fs.close();
    }

    
}

MapReduce本地调试

本地运行最重要的是环境的配置.
1 maven引用的hadoop版本要和本地Hadoop版本一致.
2 网上下载windows版本的工具包(就是bin下的几个文件)替换/hadoop/bin. 这个我的网盘存了一份, 另外网上都能找到.

maven:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>hadoop</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>


    <dependencies>
        <!-- hadoop 分布式文件系统类库 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.5</version>
        </dependency>

        <!-- hadoop 公共类库 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.5</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.5</version>
        </dependency>
    </dependencies>


</project>

conf中设置本地模式

public class WordcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        // 本地调试MapReduce
        conf.set("mapreduce.framework.name","local");
        conf.set("fs.defaultFS","file:///");

        Job job = Job.getInstance(conf);

        // 指定本程序的jar包所在本地路径
        job.setJarByClass(WordcountDriver.class);

        // 指定本业务用的mapper reducer 类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 指定最终输出数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 指定job的输入源文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job的输出结果所在目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // job.submit();
        boolean result = job.waitForCompletion(true);

        // 根据处理结果给程序设定退出码
        System.exit(result ? 0 : 1);

    }
}

WordCount-单词统计程序

统计单词次数的mapreducer程序

maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>hadoop</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>


    <dependencies>
        <!-- hadoop 分布式文件系统类库 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.5</version>
        </dependency>

        <!-- hadoop 公共类库 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.5</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.5</version>
        </dependency>
    </dependencies>


</project>

WordcountMapper

package wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 一个单词次数统计的MapReduce程序。
 * 泛型的含义如下:
 *
 * KeyIn: 默认情况下是mr框架所读到的第一行文本的偏移量, Long.
 * 但是在hadoop中有自己更精简的序列化接口, 所以不用Long, 而用LongWritable
 * ValueIn: 默认情况是时mr框架读到的一行文本内容, String 同上用Text
 *
 * KeyOut: 是用户自定义逻辑处理完之后输出数据中的Key, 在此处是单词, String
 * ValueOut: 是用户自定义逻辑处理完之后输出数据中的, 在此处是单次次数, Integer
 *
 * Created by zb on 2017/7/13.
 */
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * map阶段的业务逻辑就写在自定义的map()中
     * mapTask会对每一行输入数据调用一次我们的map()
     *
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 将mapTask传给我们的的文本内容先转换成String
        String line = value.toString();
        // 根据空格切分单词
        String[] words = line.split(" ");

        // 将单词输出为<单词, 1>
        for (String word : words) {
            // 把单词作为key 次数作为value 分发给reduce, 相同的key会给到同一个reduceTask
            context.write(new Text(word), new IntWritable(1));
        }

    }
}

WordcountReducer

package wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 单词统计的ReducerTask,泛型参数如下:
 *
 * KeyIn ValueIn 对应Mapper输出的KeyOut ValueOut
 *
 * KeyOut ValueOut 是自定义Reducer逻辑处理结构的输出
 * KeyOut是单词 ValueOut是次数
 *
 * Created by zb on 2017/7/13.
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * @param key     是一组单词相同的KV对的key
     * @param values  是值得集合
     * @param context 上下文
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;

        for (IntWritable value : values) {
            count++;
        }

        context.write(key,new IntWritable(count));

    }
}

WordcountDriver

package wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 相当于一个yarn集群的客户端, 要再次封装我们map reduce程序的运行参数,指定jar包,最后提交给yarn
 *
 * Created by zb on 2017/7/14.
 */
public class WordcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 指定本程序的jar包所在本地路径
        job.setJarByClass(WordcountDriver.class);

        // 指定本业务用的mapper reducer 类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 指定最终输出数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 指定job的输入源文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job的输出结果所在目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // job.submit();
        boolean result = job.waitForCompletion(true);

        // 根据处理结果给程序设定退出码
        System.exit(result ? 0 : 1);

    }
}

程序的启动命令

hadoop jar hadoop-1.0-SNAPSHOT.jar wordcount.WordcountDriver /wordcount/input /wordcount/output

后面跟上运行的主类,2个参数
hadoop har 其实就是普通的 java -jar 附加Hadoop_Home下的jar包而已。

点赞