大数据学习——mapreduce运营商日志增强

需求

1、对原始json数据进行解析,变成普通文本数据

2、求出每个人评分最高的3部电影

3、求出被评分次数最多的3部电影

 

数据

https://pan.baidu.com/s/1gPsQXVYSQEZ2OYek4HxK6A

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cyf</groupId>
    <artifactId>MapReduceCases</artifactId>
    <packaging>jar</packaging>
    <version>1.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.40</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.36</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>cn.itcast.mapreduce.json.JsonToText</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
package cn.itcast.mapreduce.json;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;

public class OriginBean implements WritableComparable<OriginBean> {

    private Long movie;

    private Long rate;

    private Long timeStamp;

    private Long uid;


    public Long getMovie() {
        return movie;
    }

    public void setMovie(Long movie) {
        this.movie = movie;
    }

    public Long getRate() {
        return rate;
    }

    public void setRate(Long rate) {
        this.rate = rate;
    }

    public Long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(Long timeStamp) {
        this.timeStamp = timeStamp;
    }

    public Long getUid() {
        return uid;
    }

    public void setUid(Long uid) {
        this.uid = uid;
    }

    public OriginBean(Long movie, Long rate, Long timeStamp, Long uid) {
        this.movie = movie;
        this.rate = rate;
        this.timeStamp = timeStamp;
        this.uid = uid;
    }

    public OriginBean() {
        // TODO Auto-generated constructor stub
    }

    public int compareTo(OriginBean o) {
        return this.movie.compareTo(o.movie);
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(movie);
        out.writeLong(rate);
        out.writeLong(timeStamp);
        out.writeLong(uid);
    }


    public void readFields(DataInput in) throws IOException {
        this.movie = in.readLong();
        this.rate = in.readLong();
        this.timeStamp = in.readLong();
        this.uid = in.readLong();
    }

    @Override
    public String toString() {
        return this.movie + "\t" + this.rate + "\t" + this.timeStamp + "\t" + this.uid;
    }

}
package cn.itcast.mapreduce.json;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

public class JsonToText {

    static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

        Text k = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // Bean bean = mapper.readValue(value.toString(), Bean.class);

            JSONObject valueJson = JSON.parseObject(value.toString());

            Long movie = valueJson.getLong("movie");

            OriginBean bean = new OriginBean(movie, valueJson.getLong("rate"), valueJson.getLong("timeStamp"), valueJson.getLong("uid"));
            k.set(bean.toString());
            context.write(k, NullWritable.get());
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
//16777216/1024/1024=16 (62.5M/16)4个切片,启动4个maptask,处理结果4个文件
conf.set("mapreduce.input.fileinputformat.split.maxsize", "16777216"); Job job = Job.getInstance(conf); // job.setJarByClass(JsonToText.class); //告诉框架,我们的程序所在jar包的位置 job.setJar("/root/JsonToText.jar"); job.setMapperClass(MyMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, new Path("/json/input")); FileOutputFormat.setOutputPath(job, new Path("/json/output")); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1]));  job.waitForCompletion(true); } }

 

创建文件夹 并上传数据

hadoop fs -mkdir -p /json/input

hadoop fs -put rating.json /json/input

 

运行

hadoop jar JsonToText.jar cn.itcast.mapreduce.json.JsonToText

 

运行结果

https://pan.baidu.com/s/1ayrpl7w8Dlzpc7TRZIO94w

 

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cyf</groupId>
    <artifactId>MapReduceCases</artifactId>
    <packaging>jar</packaging>
    <version>1.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.40</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.36</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>cn.itcast.mapreduce.json.MovieRateSum</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
package cn.itcast.mapreduce.json;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class ResultBean implements WritableComparable<ResultBean> {

    private Long movie;
    private Long sumRate;

    public void setSumRate(long sumRate) {
        this.sumRate = sumRate;
    }

    public Long getMovie() {
        return movie;
    }

    public void setMovie(Long movie) {
        this.movie = movie;
    }


    public ResultBean(Long movie, Long sumRate) {
        this.movie = movie;
        this.sumRate = sumRate;
    }

    public ResultBean() {
        // TODO Auto-generated constructor stub
    }

    public int compareTo(ResultBean o) {
        if (this.movie - o.movie != 0) {
            return (int) (this.movie - o.movie);
        }
        return (int) (o.sumRate - this.sumRate);
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(movie);
        out.writeLong(sumRate);
    }


    public ResultBean(Long sumRate) {
        super();
        this.sumRate = sumRate;
    }

    public void readFields(DataInput in) throws IOException {
        this.movie = in.readLong();
        this.sumRate = in.readLong();
    }

    @Override
    public String toString() {
        //return movie + "\t" + sumRate;
        return movie + "\t" + sumRate;
    }

}
package cn.itcast.mapreduce.json;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

public class MovieRateSum {

    static class MyMapper extends Mapper<LongWritable, Text, LongWritable, OriginBean> {

        ObjectMapper mapper = new ObjectMapper();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // Bean bean = mapper.readValue(value.toString(), Bean.class);

            JSONObject valueJson = JSON.parseObject(value.toString());

            Long movie = valueJson.getLong("movie");

            OriginBean bean = new OriginBean(movie, valueJson.getLong("rate"), valueJson.getLong("timeStamp"), valueJson.getLong("uid"));

            context.write(new LongWritable(bean.getMovie()), bean);
        }
    }

    static class MyReduce extends Reducer<LongWritable, OriginBean, ResultBean, NullWritable> {

        @Override
        protected void reduce(LongWritable movie, Iterable<OriginBean> beans, Context context) throws IOException, InterruptedException {

            long sum = 0L;

            for (OriginBean bean : beans) {
                sum += bean.getRate();
            }
            ResultBean bean = new ResultBean();
            bean.setMovie(movie.get());
            bean.setSumRate(sum);
            context.write(bean, NullWritable.get());
        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

//        job.setJarByClass(MovieRateSum.class);
        //告诉框架,我们的程序所在jar包的位置
        job.setJar("/root/MovieRateSum.jar");
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReduce.class);

        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(OriginBean.class);

        job.setOutputKeyClass(ResultBean.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("/json/output"));
        FileOutputFormat.setOutputPath(job, new Path("/json/output-seq"));
        // FileInputFormat.setInputPaths(job, new Path(args[0]));
        // FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }

}

 

    原文作者:MapReduce
    原文地址: https://www.cnblogs.com/feifeicui/p/10222454.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞