MapReduce之日志清洗与分析
- 本文运用的日志文件如下:需要可以留言
- 分别为:访问者主机名、访问者IP、访问时间、访问资源、访问状态(HTTP状态码)、本次访问流量等等
- 根据要求,我们要将访问资源号以及访问流量统计出来,其他数据将清洗,观察格式发现可以进行二次切割 —> 空格以及等号
1.依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.3</version>
</dependency>
2.添加log4j.properties文件在资源目录下即resources,文件内容如下
log4j.rootLogger=FATAL, dest1
log4j.logger.dsaLogging=DEBUG, dsa
log4j.additivity.dsaLogging=false
log4j.appender.dest1=org.apache.log4j.ConsoleAppender
log4j.appender.dest1.layout=org.apache.log4j.PatternLayout
log4j.appender.dest1.layout.ConversionPattern=%-5p:%l: %m%n
log4j.appender.dest1.ImmediateFlush=true
log4j.appender.dsa=org.apache.log4j.RollingFileAppender
log4j.appender.dsa.File=./logs/dsa.log
log4j.appender.dsa.MaxFileSize=2000KB
# Previously MaxBackupIndex=2
log4j.appender.dsa.MaxBackupIndex=5
log4j.appender.dsa.layout=org.apache.log4j.PatternLayout
log4j.appender.dsa.layout.ConversionPattern=%l:%d: %m%n
3.编写Mapper类
package com.mr.log;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text key2 = new Text();
IntWritable values2 = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//分词:找到id
String data = value.toString();
String[] splits = value.toString().split(" ");
String url = splits[6];
String[] split = url.split("=");
for(int i =0;i<split.length;i++){
System.out.println("第二次切割后,数组中的元素" +i+" : "+split[i]);
}
String id=split[1];
key2.set(id);
values2.set(1);
context.write(key2,values2);
}
}
4.编写Reducer类
package com.mr.log;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable v :values) {
sum += v.get();
}
context.write(key,new IntWritable(sum));
}
}
5.编写Job类
package com.mr.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(LogJob.class);
job.setMapperClass(LogMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(LogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job,new Path("D:\\HadoopInputText\\data\\log\\access.log"));
FileOutputFormat.setOutputPath(job,new Path("D:\\HadoopOutputTest\\logout\\"));
job.waitForCompletion(true);
}
}
6.运行