java客户端&开发环境搭建
win7下开发环境配置
1 先官网下hadop,然后配置HADOOP_HOME.
2 用csdn下的包替换HADOOP_HOME里的bin目录
此文件已经存于网盘
要注意版本对应.
这个包是操作系统依赖的文件. 在linux 下没毛病,win7很蛋疼.
hdfs上传文件
其他的都差不多,照着写就行.
package com.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Created by v_zhangbing on 2017/7/4.
*/
public class HdfsClientTest {
private Configuration conf;
private FileSystem fs;
@Before
public void init() throws URISyntaxException, IOException, InterruptedException {
conf = new Configuration();
//conf.set("fs.defaultFS","hdfs://ubuntu:9000");
fs = FileSystem.get(new URI("hdfs://ubuntu:9000"), conf, "zb");
}
@Test
public void testUploadFile() throws IOException {
fs.copyFromLocalFile(new Path("C:/Users/v_zhangbing/Downloads/aaa"), new Path("/java/aaa"));
fs.close();
}
}
MapReduce本地调试
本地运行最重要的是环境的配置.
1 maven引用的hadoop版本要和本地Hadoop版本一致.
2 网上下载windows版本的工具包(就是bin下的几个文件)替换/hadoop/bin. 这个我的网盘存了一份, 另外网上都能找到.
maven:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>hadoop</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<!-- hadoop 分布式文件系统类库 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.5</version>
</dependency>
<!-- hadoop 公共类库 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
</dependencies>
</project>
conf中设置本地模式
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 本地调试MapReduce
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
// 指定本程序的jar包所在本地路径
job.setJarByClass(WordcountDriver.class);
// 指定本业务用的mapper reducer 类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定最终输出数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定job的输入源文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// job.submit();
boolean result = job.waitForCompletion(true);
// 根据处理结果给程序设定退出码
System.exit(result ? 0 : 1);
}
}
WordCount-单词统计程序
统计单词次数的mapreducer程序
maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>hadoop</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<!-- hadoop 分布式文件系统类库 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.5</version>
</dependency>
<!-- hadoop 公共类库 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
</dependencies>
</project>
WordcountMapper
package wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 一个单词次数统计的MapReduce程序。
* 泛型的含义如下:
*
* KeyIn: 默认情况下是mr框架所读到的第一行文本的偏移量, Long.
* 但是在hadoop中有自己更精简的序列化接口, 所以不用Long, 而用LongWritable
* ValueIn: 默认情况是时mr框架读到的一行文本内容, String 同上用Text
*
* KeyOut: 是用户自定义逻辑处理完之后输出数据中的Key, 在此处是单词, String
* ValueOut: 是用户自定义逻辑处理完之后输出数据中的, 在此处是单次次数, Integer
*
* Created by zb on 2017/7/13.
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map阶段的业务逻辑就写在自定义的map()中
* mapTask会对每一行输入数据调用一次我们的map()
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将mapTask传给我们的的文本内容先转换成String
String line = value.toString();
// 根据空格切分单词
String[] words = line.split(" ");
// 将单词输出为<单词, 1>
for (String word : words) {
// 把单词作为key 次数作为value 分发给reduce, 相同的key会给到同一个reduceTask
context.write(new Text(word), new IntWritable(1));
}
}
}
WordcountReducer
package wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 单词统计的ReducerTask,泛型参数如下:
*
* KeyIn ValueIn 对应Mapper输出的KeyOut ValueOut
*
* KeyOut ValueOut 是自定义Reducer逻辑处理结构的输出
* KeyOut是单词 ValueOut是次数
*
* Created by zb on 2017/7/13.
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* @param key 是一组单词相同的KV对的key
* @param values 是值得集合
* @param context 上下文
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count++;
}
context.write(key,new IntWritable(count));
}
}
WordcountDriver
package wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 相当于一个yarn集群的客户端, 要再次封装我们map reduce程序的运行参数,指定jar包,最后提交给yarn
*
* Created by zb on 2017/7/14.
*/
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 指定本程序的jar包所在本地路径
job.setJarByClass(WordcountDriver.class);
// 指定本业务用的mapper reducer 类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定最终输出数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定job的输入源文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// job.submit();
boolean result = job.waitForCompletion(true);
// 根据处理结果给程序设定退出码
System.exit(result ? 0 : 1);
}
}
程序的启动命令
hadoop jar hadoop-1.0-SNAPSHOT.jar wordcount.WordcountDriver /wordcount/input /wordcount/output
后面跟上运行的主类,2个参数
hadoop har 其实就是普通的 java -jar 附加Hadoop_Home下的jar包而已。