Mapreduce案例之移动公司日志分析

1.分布式计算思想:

1.1基本思想:mapreduce是两个操作步骤,即映射和规约也是这个分布式计算的思想。即实现一个指定的Map映射函数,用来把一组键值对映射成新的键值对,再把新的键值对发送个Reduce规约函数,用来保证所有映射的键值对中的每一个共享相同的键组。

1.2执行流程:HDFS上的数据与map任务沟通时会被切分split一个split对应一个map ,块和split数目不一定相同,每一个reduce任务对应一个文件。结果存放在目录中

map任务运行在节点上(一个节点可以运行多个map任务,但是一个map任务不能跨多个节点上运行)reduce任务与map同理进入map键值对k1

v1原始数据

1.3处理计算步骤:

1.3.1. map任务处理

1.1读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。

1.2.写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

1.3对输出的key、value进行分区。

1.4.对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

1.5分组后的数据进行归约。

1.3.2.reduce任务处理

2.1.对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

2.2.对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、values处理,转换成新的key、value输出。

2.3把reduce的输出保存到HDFS文件中,k3

v3是结果数据,k2 v2是中间数据,k1 v1是初始数据

某些类的数据,送到某些reduce中,这个过程被称为shuffle——数据分配过程map和reducer在伪分布式下是在第一个节点的map任务数量一般一个节点会有2个任务,输出流hdfs完成

2.业务逻辑:

移动公司日志,一般记录以下几个数据,手机号码,上行流量,下行流量。目的统计一个手机号码某一个时间段的产生的总流量。并按照一定的规则排序。

首先把这个庞大的数据源从本地上传到Hdfs上,被逻辑切分多个块,然后进行分布式并行计算统计总流量。

3.具体步骤:

3.1.继承Mapper类重写map方法

这个步骤就是把大文件分成若干个split,每个split对应一个map任务,比如说计算一个136字段的手机号的总流量。

3.2.继承Reducer类重写reduce方法

这个步骤是对map处理的数据进行汇总,比如分区,排序。

3.3.新建驱动Job加载reducemap方法

Mapreduce代码:

import java.io.IOException;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataCount {

publicstatic class DCMapper extends Mapper{

protectedvoid map(LongWritable key, Text value, Context context)

throwsIOException, InterruptedException {

//把文本数据转化String类型便于后续分割

Stringline = value.toString();

//分割数据

String[]fields = line.split(“\t”);

Stringtel = fields[1];

longup = Long.parseLong(fields[8]);

longdown = Long.parseLong(fields[9]);

DataBeanbean = new DataBean(tel, up, down); //封装的对象

//send

context.write(newText(tel), bean);//提交给reduce

}

}

publicstatic class DCReducer extends Reducer{

@Override

protectedvoid reduce(Text key, Iterable values, Context context)

throwsIOException, InterruptedException {

longup_sum = 0;

longdown_sum = 0;

for(DataBeanbean : values){

up_sum+= bean.getUpPayLoad();

down_sum+= bean.getDownPayLoad();

}

DataBeanbean = new DataBean(“”, up_sum, down_sum);

context.write(key,bean);

}

}

publicstaticclass Partitioner1 extendsPartitioner{

publicint getPartition(Text key, DataBean value, int numPartitions)

{

Stringaccount=key.toString();

Stringsub_acc=account.substring(0,3);

return0;

}

}

publicstatic void main(String[] args) throws Exception

{

Configurationconf = new Configuration();

Jobjob = Job.getInstance(conf);

job.setJarByClass(DataCount.class);

job.setMapperClass(DCMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(DataBean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

job.setReducerClass(DCReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(DataBean.class);

FileOutputFormat.setOutputPath(job,new Path(args[1]));

job.waitForCompletion(true);

}

}

4.对于对象的封装

封装代码:

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class DataBean implements Writable{

privateString tel;

privatelong upPayLoad;

privatelong downPayLoad;

privatelong totalPayLoad;

publicDataBean(){}

publicDataBean(String tel, long upPayLoad, long downPayLoad) {

super();

this.tel= tel;

this.upPayLoad= upPayLoad;

this.downPayLoad= downPayLoad;

this.totalPayLoad= upPayLoad + downPayLoad;

}

publicString toString() {

returnthis.upPayLoad + “\t” + this.downPayLoad + “\t” +this.totalPayLoad;

}

publicvoid write(DataOutput out) throws IOException {

out.writeUTF(tel);

out.writeLong(upPayLoad);

out.writeLong(downPayLoad);

out.writeLong(totalPayLoad);

}

publicvoid readFields(DataInput in) throws IOException {

this.tel= in.readUTF();

this.upPayLoad= in.readLong();

this.downPayLoad= in.readLong();

this.totalPayLoad= in.readLong();

}

publicString getTel() {

returntel;

}

publicvoid setTel(String tel) {

this.tel= tel;

}

publiclong getUpPayLoad() {

returnupPayLoad;

}

publicvoid setUpPayLoad(long upPayLoad) {

this.upPayLoad= upPayLoad;

}

publiclong getDownPayLoad() {

returndownPayLoad;

}

publicvoid setDownPayLoad(long downPayLoad) {

this.downPayLoad= downPayLoad;

}

publiclong getTotalPayLoad() {

returntotalPayLoad;

}

publicvoid setTotalPayLoad(long totalPayLoad) {

this.totalPayLoad= totalPayLoad;

}

}

5.业务处理排序:

排序代码:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class paixu extendsMapper

{

publicstatic void main(String[] args) throws IOException {

Configurationconf=new Configuration();

Jobjob=Job.getInstance(conf);

job.setJarByClass(paixu.class);

//job.setMapperClass();

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(bean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

job.setReducerClass(r.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(bean.class);

FileOutputFormat.setOutputPath(job,new Path(args[1]));

}

privateText k=new Text();

privatebean v=new bean();

protectedvoid map(LongWritable key, Text value,Context context)

throwsIOException, InterruptedException {

Stringline=value.toString();

String[]fields=line.split(“\t”);

Stringaccount=fields[0];

doublein=Double.parseDouble(fields[1]);

doubleout=Double.parseDouble(fields[2]);

k.set(account);

v.set(account,in, out);

context.write(k,v);

}

publicstatic class r extends Reducer{

privatebean v =new bean();

protectedvoid reduce(Text key, Iterable value,Context context)

throwsIOException, InterruptedException {

doubleinsum=0;

doubleoutsum=0;

for(beano:value)

{

insum+=o.getIncome();

outsum+=o.getExpenses();

}

v.set(“”,insum, outsum);

context.write(key,v);

}

}

}

6.运行结果:

总结:在写Mapreduce中体会到只要做好自己三个步骤,剩下的框架都会自动做好,不需要考虑,但是在自己做的三个步骤中最难的就是业务逻辑的处理。所以在Mapreduce分布式并行计算中业务逻辑处理是重中之重。

    原文作者:起个什么呢称呢
    原文地址: https://www.jianshu.com/p/4bd75f2518ad
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞