分布式的hadoop的安装
参考:https://www.jianshu.com/p/117243649c18、https://www.jianshu.com/p/21c992596497
wordcount
参考官方documention
package documentation;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
/**
* MAP Class
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
//<Object, Text, Text, IntWritable> 定义了map输入Object, Text, 输出为Object, Text
//value
private final static IntWritable one = new IntWritable(1); //hadoop 的int值
//key
private Text word = new Text(); //hadoop 的String
/**
* map方法没有返回类型, 输出当做参数给出:context, 目标是创建键值对
* @param key 键值 object
* @param value 数值
* @param context 这里定义了map函数的输出,并不是采用return的方式返回 而是定义在参数里面
* @throws IOException
* @throws InterruptedException
*/
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
// 分词提取词汇
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one); // 按照键值对写入context中
}
}
}
/**
* REDUCE Class
*/
//<k2, v2> -> reduce -> <k3, v3> (output)
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
// Text,IntWritable,为reduce输入类型 Text,IntWritable为reduce的输出类型
//value
private IntWritable result = new IntWritable();
/**
* 同样返回值也作为参数传入,对相同的键值做加和处理
* @param key Text,
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
//(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
具体见http://hadoop.apache.org/docs/r2.7.6/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html的Walk-through。
job.setCombinerClass(IntSumReducer.class);
这个通常就是reducer,for local aggregation, after being sorted on the keys. helps to cut down the amount of data transferred from the Mapper to the Reducer.
maven打包project
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.learning</groupId>
<artifactId>hadoop</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<!--<version>3.1</version>-->
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
<!--<compilerArguments>-->
<!--<extdirs>>${project.basedir}/lib</extdirs><!–指定外部lib–>-->
<!--</compilerArguments>-->
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<!-- ${project.build.directory}为Maven内置变量,缺省为target -->
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<!-- 表示是否不包含间接依赖的包 -->
<excludeTransitive>false</excludeTransitive>
<!-- 表示复制的jar文件去掉版本信息 -->
<stripVersion>false</stripVersion>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<id>copy-xmls</id>
<phase>process-sources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/</outputDirectory>
<resources>
<resource>
<directory>${basedir}/</directory>
<!--<directory>data/</directory>-->
<!--<includes>-->
<!--<include>data/</include>-->
<!--<include>company_word_id/</include>-->
<!--<include>src/main/resources/application.properties</include>-->
<!--<include>src/main/resources/constant.properties</include>-->
<!--<include>src/main/resources/dbConfig.properties</include>-->
<!--<include>src/main/resources/synonym_words.csv</include>-->
<!--<include>src/main/resources/标签匹配规则20180528.xlsx</include>-->
<!--</includes>-->
<!--<filtering>true</filtering>-->
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<!--<version>2.4</version>-->
<configuration>
<archive>
<!--<!–生成的jar中,不要包含pom.xml和pom.properties这两个文件–>-->
<!--<addMavenDescriptor>false</addMavenDescriptor>-->
<manifest>
<!-- 告知 maven-jar-plugin添加一个 Class-Path元素到 MANIFEST.MF文件,以及在Class-Path元素中包括所有依赖项 -->
<addClasspath>true</addClasspath>
<!-- 所有的依赖项应该位于 lib文件夹 -->
<classpathPrefix>lib/</classpathPrefix>
<!-- 当用户使用 lib命令执行JAR文件时,使用该元素定义将要执行的类名 -->
<mainClass>documentation.WordCount</mainClass>
</manifest>
</archive>
<!--<!–过滤掉不希望包含在jar中的文件–>-->
<!--<excludes>-->
<!--<exclude>${project.basedir}/xml/*</exclude>-->
<!--</excludes>-->
</configuration>
</plugin>
</plugins>
</build>
</project>
启动集群
[root@master ~]# start-dfs.sh
Starting namenodes on [master]
master: Warning: Permanently added 'master,172.16.21.220' (ECDSA) to the list of known hosts.
master: starting namenode, logging to /opt/hadoop-2.7.3/logs/hadoop-root-namenode-master.out
slave02: Warning: Permanently added 'slave02,172.16.21.222' (ECDSA) to the list of known hosts.
slave01: Warning: Permanently added 'slave01,172.16.21.221' (ECDSA) to the list of known hosts.
slave02: starting datanode, logging to /opt/hadoop-2.7.3/logs/hadoop-root-datanode-slave02.out
slave01: starting datanode, logging to /opt/hadoop-2.7.3/logs/hadoop-root-datanode-slave01.out
Starting secondary namenodes [master]
master: Warning: Permanently added 'master,172.16.21.220' (ECDSA) to the list of known hosts.
master: starting secondarynamenode, logging to /opt/hadoop-2.7.3/logs/hadoop-root-secondarynamenode-master.out
[root@master ~]# start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /opt/hadoop-2.7.3/logs/yarn-root-resourcemanager-master.out
slave01: Warning: Permanently added 'slave01,172.16.21.221' (ECDSA) to the list of known hosts.
slave02: Warning: Permanently added 'slave02,172.16.21.222' (ECDSA) to the list of known hosts.
slave01: starting nodemanager, logging to /opt/hadoop-2.7.3/logs/yarn-root-nodemanager-slave01.out
slave02: starting nodemanager, logging to /opt/hadoop-2.7.3/logs/yarn-root-nodemanager-slave02.out
image.png
dfs根目录创建/hdfsRoot/data
[root@master ~]# hadoop fs -mkdir -p /hdfsRoot/data
[root@master ~]# hadoop fs -ls /hdfsRoot/
Found 1 items
drwxr-xr-x - root supergroup 0 2018-08-17 23:40 /hdfsRoot/data
#测试数据
[root@master Data]# gedit testwordcount.data
[root@master Data]# more testwordcount.data
d b s a k d s a k d s a k h d s a l k d h s a l h d h s a l h d k s a
# 传输要运行的文件
[root@master Data]# hadoop fs -copyFromLocal testwordcount.data /hdfsRoot/data/
[root@master Data]# hadoop fs -ls /hdfsRoot/data/
Found 3 items
-rw-r--r-- 3 root supergroup 71 2018-08-19 22:33 /hdfsRoot/data/testwordcount.data
-rw-r--r-- 3 root supergroup 2171334 2018-08-17 23:23 /hdfsRoot/data/xiyouji.txt
drwxr-xr-x - root supergroup 0 2018-08-17 23:41 /hdfsRoot/data/xiyouji_wordcount.txt
运行打包好的jar文件
[root@master jars]# ls
com.learning.hadoop
[root@master jars]# cd com.learning.hadoop/
[root@master com.learning.hadoop]# ls
classes hadoop-1.0-SNAPSHOT.jar lib src
[root@master com.learning.hadoop]# hadoop jar hadoop-1.0-SNAPSHOT.jar /hdfsRoot/data/testwordcount.data /hdfsRoot/data/testwordcount.stat
18/08/19 22:38:35 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.21.220:8032
18/08/19 22:38:37 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/08/19 22:38:38 INFO input.FileInputFormat: Total input paths to process : 1
18/08/19 22:38:38 INFO mapreduce.JobSubmitter: number of splits:1
18/08/19 22:38:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1534688585766_0001
18/08/19 22:38:40 INFO impl.YarnClientImpl: Submitted application application_1534688585766_0001
18/08/19 22:38:41 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1534688585766_0001/
18/08/19 22:38:41 INFO mapreduce.Job: Running job: job_1534688585766_0001
18/08/19 22:39:11 INFO mapreduce.Job: Job job_1534688585766_0001 running in uber mode : false
18/08/19 22:39:11 INFO mapreduce.Job: map 0% reduce 0%
18/08/19 22:39:29 INFO mapreduce.Job: map 100% reduce 0%
18/08/19 22:39:48 INFO mapreduce.Job: map 100% reduce 100%
18/08/19 22:39:50 INFO mapreduce.Job: Job job_1534688585766_0001 completed successfully
18/08/19 22:39:50 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=62
FILE: Number of bytes written=237129
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=187
HDFS: Number of bytes written=28
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=15238
Total time spent by all reduces in occupied slots (ms)=15465
Total time spent by all map tasks (ms)=15238
Total time spent by all reduce tasks (ms)=15465
Total vcore-milliseconds taken by all map tasks=15238
Total vcore-milliseconds taken by all reduce tasks=15465
Total megabyte-milliseconds taken by all map tasks=15603712
Total megabyte-milliseconds taken by all reduce tasks=15836160
Map-Reduce Framework
Map input records=1
Map output records=35
Map output bytes=210
Map output materialized bytes=62
Input split bytes=116
Combine input records=35
Combine output records=7
Reduce input groups=7
Reduce shuffle bytes=62
Reduce input records=7
Reduce output records=7
Spilled Records=14
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=1222
CPU time spent (ms)=4800
Physical memory (bytes) snapshot=306180096
Virtual memory (bytes) snapshot=4159737856
Total committed heap usage (bytes)=165810176
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=71
File Output Format Counters
Bytes Written=28
[root@master com.learning.hadoop]#
image.png
统计结果展示
[root@master com.learning.hadoop]# hadoop fs -copyToLocal /hdfsRoot/data/testwordcount.stat ~/Data/
[root@master com.learning.hadoop]# cd ~/Data/
[root@master Data]# ls
ncdc ncdc.sh node.csv relation.csv testwordcount.data testwordcount.stat xiyouji.txt xiyoujiwordcout.txt
[root@master Data]# cd testwordcount.stat/
[root@master testwordcount.stat]# ls
part-r-00000 _SUCCESS
[root@master testwordcount.stat]# more part-r-00000
a 7
b 1
d 7
h 5
k 5
l 3
s 7
[root@master testwordcount.stat]#
hadoop 命令:
http://hadoop.apache.org/docs/r2.7.6/hadoop-project-dist/hadoop-common/CommandsManual.html
HDFS 命令:
http://hadoop.apache.org/docs/r2.7.6/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html
wordcount2
code
package documentation;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;
public class WordCount2 {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
enum CountersEnum { INPUT_WORDS }
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive;
private Set<String> patternsToSkip = new HashSet<String>();
private Configuration conf;
private BufferedReader fis;
@Override
public void setup(Context context) throws IOException,
InterruptedException {
conf = context.getConfiguration();
caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
if (conf.getBoolean("wordcount.skip.patterns", true)) {
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
for (URI patternsURI : patternsURIs) {
Path patternsPath = new Path(patternsURI.getPath());
String patternsFileName = patternsPath.getName();
parseSkipFile(patternsFileName);
}
}
}
private void parseSkipFile(String fileName) {
try {
fis = new BufferedReader(new FileReader(fileName));
String pattern = null;
while ((pattern = fis.readLine()) != null) {
patternsToSkip.add(pattern);
}
} catch (IOException ioe) {
System.err.println("Caught exception while parsing the cached file '"
+ StringUtils.stringifyException(ioe));
}
}
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = (caseSensitive) ?
value.toString() : value.toString().toLowerCase();
for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
Counter counter = context.getCounter(CountersEnum.class.getName(),
CountersEnum.INPUT_WORDS.toString());
counter.increment(1);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
List<String> otherArgs = new ArrayList<String>();
for (int i=0; i < remainingArgs.length; ++i) {
if ("-skip".equals(remainingArgs[i])) {
job.addCacheFile(new Path(remainingArgs[++i]).toUri());
job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
} else {
otherArgs.add(remainingArgs[i]);
}
}
FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
run
[root@master ~]# echo Hello World, Bye World! >> file01.txt
[root@master ~]# echo Hello Hadoop, Goodbye to hadoop. >> file02.txt
[root@master ~]# hadoop fs -mkdir /hdfsRoot/wordcount/input
[root@master ~]# hadoop fs -mkdir /hdfsRoot/wordcount/output
[root@master ~]# hadoop fs -put file* /hdfsRoot/wordcount/input
[root@master ~]# hadoop fs -ls /hdfsRoot/wordcount/input
Found 2 items
-rw-r--r-- 3 root supergroup 24 2018-09-02 23:39 /hdfsRoot/wordcount/input/file01.txt
-rw-r--r-- 3 root supergroup 33 2018-09-02 23:39 /hdfsRoot/wordcount/input/file02.txt