运行mapreduce - java.lang.InterruptedException

错误日志:

 

2018-11-19 05:23:51,686 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-11-19 05:23:52,595 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1181)) - session.id is deprecated. Instead, use dfs.metrics.session-id
2018-11-19 05:23:52,596 INFO  [main] jvm.JvmMetrics (JvmMetrics.java:init(79)) - Initializing JVM Metrics with processName=JobTracker, sessionId=
2018-11-19 05:23:53,215 WARN  [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(64)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2018-11-19 05:23:53,289 INFO  [main] input.FileInputFormat (FileInputFormat.java:listStatus(289)) - Total input files to process : 1
2018-11-19 05:23:53,375 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(200)) - number of splits:1
2018-11-19 05:23:53,684 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(289)) - Submitting tokens for job: job_local2033293629_0001
2018-11-19 05:23:54,051 INFO  [main] mapreduce.Job (Job.java:submit(1345)) - The url to track the job: http://localhost:8080/
2018-11-19 05:23:54,052 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1390)) - Running job: job_local2033293629_0001
2018-11-19 05:23:54,053 INFO  [Thread-23] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(498)) - OutputCommitter set in config null
2018-11-19 05:23:54,059 INFO  [Thread-23] output.FileOutputCommitter (FileOutputCommitter.java:<init>(123)) - File Output Committer Algorithm version is 1
2018-11-19 05:23:54,061 INFO  [Thread-23] output.FileOutputCommitter (FileOutputCommitter.java:<init>(138)) - FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2018-11-19 05:23:54,070 INFO  [Thread-23] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(516)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2018-11-19 05:23:54,156 INFO  [Thread-23] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(475)) - Waiting for map tasks
2018-11-19 05:23:54,157 INFO  [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(251)) - Starting task: attempt_local2033293629_0001_m_000000_0
2018-11-19 05:23:54,185 INFO  [LocalJobRunner Map Task Executor #0] output.FileOutputCommitter (FileOutputCommitter.java:<init>(123)) - File Output Committer Algorithm version is 1
2018-11-19 05:23:54,187 INFO  [LocalJobRunner Map Task Executor #0] output.FileOutputCommitter (FileOutputCommitter.java:<init>(138)) - FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2018-11-19 05:23:54,219 INFO  [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(619)) -  Using ResourceCalculatorProcessTree : [ ]
2018-11-19 05:23:54,224 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:runNewMapper(756)) - Processing split: hdfs://hadoop1:9000/Input/test.txt:0+1149413
2018-11-19 05:23:54,303 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:setEquator(1205)) - (EQUATOR) 0 kvi 26214396(104857584)
2018-11-19 05:23:54,303 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(998)) - mapreduce.task.io.sort.mb: 100
2018-11-19 05:23:54,304 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(999)) - soft limit at 83886080
2018-11-19 05:23:54,304 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(1000)) - bufstart = 0; bufvoid = 104857600
2018-11-19 05:23:54,304 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(1001)) - kvstart = 26214396; length = 6553600
2018-11-19 05:23:54,305 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(403)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2018-11-19 05:23:54,983 INFO  [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - 
2018-11-19 05:23:54,987 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1462)) - Starting flush of map output
2018-11-19 05:23:54,987 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1484)) - Spilling map output
2018-11-19 05:23:54,987 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1485)) - bufstart = 0; bufend = 1574408; bufvoid = 104857600
2018-11-19 05:23:54,988 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1487)) - kvstart = 26214396(104857584); kvend = 25789404(103157616); length = 424993/6553600
2018-11-19 05:23:55,054 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1411)) - Job job_local2033293629_0001 running in uber mode : false
2018-11-19 05:23:55,055 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1418)) -  map 0% reduce 0%
2018-11-19 05:23:55,483 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:sortAndSpill(1669)) - Finished spill 0
2018-11-19 05:23:55,496 INFO  [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:done(1099)) - Task:attempt_local2033293629_0001_m_000000_0 is done. And is in the process of committing
2018-11-19 05:23:55,516 INFO  [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - map
2018-11-19 05:23:55,516 INFO  [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:sendDone(1219)) - Task 'attempt_local2033293629_0001_m_000000_0' done.
2018-11-19 05:23:55,516 INFO  [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(276)) - Finishing task: attempt_local2033293629_0001_m_000000_0
2018-11-19 05:23:55,517 INFO  [Thread-23] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(483)) - map task executor complete.
2018-11-19 05:23:55,518 INFO  [Thread-23] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(475)) - Waiting for reduce tasks
2018-11-19 05:23:55,518 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(329)) - Starting task: attempt_local2033293629_0001_r_000000_0
2018-11-19 05:23:55,530 INFO  [pool-7-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:<init>(123)) - File Output Committer Algorithm version is 1
2018-11-19 05:23:55,530 INFO  [pool-7-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:<init>(138)) - FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2018-11-19 05:23:55,531 INFO  [pool-7-thread-1] mapred.Task (Task.java:initialize(619)) -  Using ResourceCalculatorProcessTree : [ ]
2018-11-19 05:23:55,542 INFO  [pool-7-thread-1] mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@2de8a214
2018-11-19 05:23:55,564 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(206)) - MergerManager: memoryLimit=678356544, maxSingleShuffleLimit=169589136, mergeThreshold=447715328, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2018-11-19 05:23:55,574 INFO  [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local2033293629_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2018-11-19 05:23:55,690 INFO  [localfetcher#1] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(145)) - localfetcher#1 about to shuffle output of map attempt_local2033293629_0001_m_000000_0 decomp: 569824 len: 569828 to MEMORY
2018-11-19 05:23:55,702 INFO  [localfetcher#1] reduce.InMemoryMapOutput (InMemoryMapOutput.java:doShuffle(93)) - Read 569824 bytes from map-output for attempt_local2033293629_0001_m_000000_0
2018-11-19 05:23:55,706 INFO  [localfetcher#1] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(321)) - closeInMemoryFile -> map-output of size: 569824, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->569824
2018-11-19 05:23:55,707 INFO  [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning
2018-11-19 05:23:55,708 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - 1 / 1 copied.
2018-11-19 05:23:55,708 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(693)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2018-11-19 05:23:55,722 INFO  [pool-7-thread-1] mapred.Merger (Merger.java:merge(606)) - Merging 1 sorted segments
2018-11-19 05:23:55,729 INFO  [pool-7-thread-1] mapred.Merger (Merger.java:merge(705)) - Down to the last merge-pass, with 1 segments left of total size: 569809 bytes
2018-11-19 05:23:55,838 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(760)) - Merged 1 segments, 569824 bytes to disk to satisfy reduce memory limit
2018-11-19 05:23:55,844 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(790)) - Merging 1 files, 569828 bytes from disk
2018-11-19 05:23:55,848 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(805)) - Merging 0 segments, 0 bytes from memory into reduce
2018-11-19 05:23:55,850 INFO  [pool-7-thread-1] mapred.Merger (Merger.java:merge(606)) - Merging 1 sorted segments
2018-11-19 05:23:55,852 INFO  [pool-7-thread-1] mapred.Merger (Merger.java:merge(705)) - Down to the last merge-pass, with 1 segments left of total size: 569809 bytes
2018-11-19 05:23:55,857 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - 1 / 1 copied.
2018-11-19 05:23:55,901 INFO  [pool-7-thread-1] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1181)) - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
2018-11-19 05:23:56,056 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1418)) -  map 100% reduce 0%
2018-11-19 05:23:56,084 INFO  [pool-7-thread-1] mapred.Task (Task.java:done(1099)) - Task:attempt_local2033293629_0001_r_000000_0 is done. And is in the process of committing
2018-11-19 05:23:56,089 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - 1 / 1 copied.
2018-11-19 05:23:56,091 INFO  [pool-7-thread-1] mapred.Task (Task.java:commit(1260)) - Task attempt_local2033293629_0001_r_000000_0 is allowed to commit now
2018-11-19 05:23:56,114 INFO  [pool-7-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(582)) - Saved output of task 'attempt_local2033293629_0001_r_000000_0' to hdfs://hadoop1:9000/Output/_temporary/0/task_local2033293629_0001_r_000000
2018-11-19 05:23:56,120 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - reduce > reduce
2018-11-19 05:23:56,120 INFO  [pool-7-thread-1] mapred.Task (Task.java:sendDone(1219)) - Task 'attempt_local2033293629_0001_r_000000_0' done.
2018-11-19 05:23:56,120 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(352)) - Finishing task: attempt_local2033293629_0001_r_000000_0
2018-11-19 05:23:56,120 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(329)) - Starting task: attempt_local2033293629_0001_r_000001_0
2018-11-19 05:23:56,127 INFO  [pool-7-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:<init>(123)) - File Output Committer Algorithm version is 1
2018-11-19 05:23:56,127 INFO  [pool-7-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:<init>(138)) - FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2018-11-19 05:23:56,128 INFO  [pool-7-thread-1] mapred.Task (Task.java:initialize(619)) -  Using ResourceCalculatorProcessTree : [ ]
2018-11-19 05:23:56,128 INFO  [pool-7-thread-1] mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@2bfb9fd5
2018-11-19 05:23:56,133 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(206)) - MergerManager: memoryLimit=678356544, maxSingleShuffleLimit=169589136, mergeThreshold=447715328, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2018-11-19 05:23:56,138 INFO  [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local2033293629_0001_r_000001_0 Thread started: EventFetcher for fetching Map Completion Events
2018-11-19 05:23:56,151 INFO  [localfetcher#2] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(145)) - localfetcher#2 about to shuffle output of map attempt_local2033293629_0001_m_000000_0 decomp: 1217086 len: 1217090 to MEMORY
2018-11-19 05:23:56,153 INFO  [localfetcher#2] reduce.InMemoryMapOutput (InMemoryMapOutput.java:doShuffle(93)) - Read 1217086 bytes from map-output for attempt_local2033293629_0001_m_000000_0
2018-11-19 05:23:56,157 INFO  [localfetcher#2] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(321)) - closeInMemoryFile -> map-output of size: 1217086, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->1217086
2018-11-19 05:23:56,157 INFO  [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning
2018-11-19 05:23:56,158 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - 1 / 1 copied.
2018-11-19 05:23:56,158 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(693)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2018-11-19 05:23:56,160 INFO  [pool-7-thread-1] mapred.Merger (Merger.java:merge(606)) - Merging 1 sorted segments
2018-11-19 05:23:56,161 INFO  [pool-7-thread-1] mapred.Merger (Merger.java:merge(705)) - Down to the last merge-pass, with 1 segments left of total size: 1217075 bytes
2018-11-19 05:23:56,303 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(760)) - Merged 1 segments, 1217086 bytes to disk to satisfy reduce memory limit
2018-11-19 05:23:56,305 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(790)) - Merging 1 files, 1217090 bytes from disk
2018-11-19 05:23:56,307 INFO  [pool-7-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(805)) - Merging 0 segments, 0 bytes from memory into reduce
2018-11-19 05:23:56,307 INFO  [pool-7-thread-1] mapred.Merger (Merger.java:merge(606)) - Merging 1 sorted segments
2018-11-19 05:23:56,308 INFO  [pool-7-thread-1] mapred.Merger (Merger.java:merge(705)) - Down to the last merge-pass, with 1 segments left of total size: 1217075 bytes
2018-11-19 05:23:56,313 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - 1 / 1 copied.
2018-11-19 05:23:56,529 WARN  [DataStreamer for file /Output/_temporary/0/_temporary/attempt_local2033293629_0001_r_000001_0/part-r-00001] hdfs.DataStreamer (DataStreamer.java:closeResponder(929)) - Caught exception
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1252)
    at java.lang.Thread.join(Thread.java:1326)
    at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:927)
    at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:578)
    at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:755)
2018-11-19 05:23:56,531 INFO  [pool-7-thread-1] mapred.Task (Task.java:done(1099)) - Task:attempt_local2033293629_0001_r_000001_0 is done. And is in the process of committing
2018-11-19 05:23:56,534 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - 1 / 1 copied.
2018-11-19 05:23:56,534 INFO  [pool-7-thread-1] mapred.Task (Task.java:commit(1260)) - Task attempt_local2033293629_0001_r_000001_0 is allowed to commit now
2018-11-19 05:23:56,543 INFO  [pool-7-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(582)) - Saved output of task 'attempt_local2033293629_0001_r_000001_0' to hdfs://hadoop1:9000/Output/_temporary/0/task_local2033293629_0001_r_000001
2018-11-19 05:23:56,548 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(618)) - reduce > reduce
2018-11-19 05:23:56,549 INFO  [pool-7-thread-1] mapred.Task (Task.java:sendDone(1219)) - Task 'attempt_local2033293629_0001_r_000001_0' done.
2018-11-19 05:23:56,549 INFO  [pool-7-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(352)) - Finishing task: attempt_local2033293629_0001_r_000001_0
2018-11-19 05:23:56,549 INFO  [Thread-23] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(483)) - reduce task executor complete.
2018-11-19 05:23:57,057 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1418)) -  map 100% reduce 100%
2018-11-19 05:23:57,058 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1429)) - Job job_local2033293629_0001 completed successfully
2018-11-19 05:23:57,089 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1436)) - Counters: 35
    File System Counters
        FILE: Number of bytes read=4750713
        FILE: Number of bytes written=8708886
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=3448239
        HDFS: Number of bytes written=472
        HDFS: Number of read operations=27
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=12
    Map-Reduce Framework
        Map input records=101421
        Map output records=106249
        Map output bytes=1574408
        Map output materialized bytes=1786918
        Input split bytes=99
        Combine input records=0
        Combine output records=0
        Reduce input groups=23
        Reduce shuffle bytes=1786918
        Reduce input records=106249
        Reduce output records=23
        Spilled Records=212498
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=37
        Total committed heap usage (bytes)=497233920
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=1149413
    File Output Format Counters 
        Bytes Written=361
OK

 

错误代码:

package WordCount;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMain {

    public static final String HADOOP_ROOT_PATH = "hdfs://hadoop1:9000";
    public static final String HADOOP_INPUT_PATH = "hdfs://hadoop1:9000/Input";
    public static final String HADOOP_OUTPUT_PATH = "hdfs://hadoop1:9000/Output";

    public static void main(String[] args) throws IOException,
            URISyntaxException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        // 1、设置job运行时要访问的默认文件系统
        //conf.set("fs.defaultFS", HADOOP_ROOT_PATH);
        // 2、设置job提交到哪去运行
        //conf.set("mapreduce.framework.name", "yarn");
        //conf.set("yarn.resourcemanager.hostname", "hadoop1");
        // 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数
        //conf.set("mapreduce.app-submission.cross-platform", "true");

        Job job = Job.getInstance(conf);

        // 1、封装参数:jar包所在的位置
        job.setJar("/home/hadoop/wordcount.jar");
        //job.setJarByClass(WordCountMain.class);

        // 2、封装参数: 本次job所要调用的Mapper实现类、Reducer实现类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 3、封装参数:本次job的Mapper实现类、Reducer实现类产生的结果数据的key、value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 4、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径
        Path output = new Path(HADOOP_OUTPUT_PATH);
        FileSystem fs = FileSystem.get(new URI(HADOOP_ROOT_PATH), conf);
        if (fs.exists(output)) {
            fs.delete(output, true);
        }
        FileInputFormat.setInputPaths(job, new Path(HADOOP_INPUT_PATH));
        FileOutputFormat.setOutputPath(job, output); // 注意:输出路径必须不存在

        // 5、封装参数:想要启动的reduce task的数量
        job.setNumReduceTasks(2);

        // 6、提交job给yarn
        boolean res = job.waitForCompletion(true);
        System.out.println("OK");
        System.exit(res ? 0 : -1);

    }

}

 

 

修改方式添加 job 提交的地址

开放以下代码

// 2、设置job提交到哪去运行
        conf.set("mapreduce.framework.name", "yarn");

 

 

 

 
    原文作者:MapReduce
    原文地址: https://www.cnblogs.com/liuyongpingblog/p/9985921.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞