一.需求描述
利用MapReduce清洗视频网站的原数据,用Hive统计出各种TopN常规指标:
视频观看数 Top10
视频类别热度 Top10
视频观看数 Top20 所属类别包含这 Top20 视频的个数
视频观看数 Top50 所关联视频的所属类别的热度排名
每个类别中的视频热度 Top10,以Music为例
每个类别中视频流量 Top10,以Music为例
上传视频最多的用户 Top10 以及他们上传的视频
每个类别视频观看数 Top10
2.数据源结构说明
数据源1: user.txt
数据样例:
barelypolitical 151 5106
bonk65 89 144
camelcars 26 674
数据样例中的三个字段结构:
上传者用户名 | string |
上传视频数 | int |
朋友数量 | int |
数据源2: video.txt
数据样例:
fQShwYqGqsw lonelygirl15 736 People & Blogs 133 151763 3.01 666 765 fQShwYqGqsw LfAaY1p_2Is 5LELNIVyMqo vW6ZpqXjCE4 vPUAf43vc-Q ZllfQZCc2_M it2d7LaU_TA KGRx8TgZEeU aQWdqI1vd6o kzwa8NBlUeo X3ctuFCCF5k Ble9N2kDiGc R24FONE2CDs IAY5q60CmYY mUd0hcEnHiU 6OUcp6UJ2bA dv0Y_uoHrLc 8YoxhsUMlgA h59nXANN-oo 113yn3sv0eo
数据样例中的字段结构:
视频唯一 id | 11 位字符串 |
视频上传者 | 上传视频的用户名 String |
视频年龄 | 视频上传日期和 2007 年 2 月15 日之间的整数天 |
视频类别 | 上传视频指定的视频分类 |
视频长度 | 整形数字标识的视频长度 |
观看次数 | 视频被浏览的次数 |
视频评分 | 满分 5 分 |
流量 | 视频的流量,整型数字 |
评论数 | 一个视频的整数评论数 |
相关视频 id | 相关视频的 id,最多 20 个 |
上面只是拿出了一两条数据来介绍数据集的结构,在后续项目中要用到的数据集可以自行下载
二.数据清洗
1)数据分析
在video.txt中,视频可以有多个所属分类,每个所属分类用&符号分割,并且分割的两边有空格字符,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。
具体做法:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频 id 也使用“&”进行分割,这里看起来将”&”换成”\t”更方便,但是如果这样做就会将视频所属分类分割成不同字段,这样就没有办法进行清洗了
2)注意事项
这里的数据清洗不涉及reduce操作,所以只用map即可,视频的相关视频id可以没有,但是比如评论数必须有值,没有评论即为0,所以如果一条数据的字段缺少,也是脏数据,是要被清洗的
1 package mapreduce.videoETL; 2 3 import java.io.IOException; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.conf.Configured; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.NullWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.util.Tool; 15 import org.apache.hadoop.util.ToolRunner; 16 17 public class VideoMapReduce extends Configured implements Tool{ 18 public static class VideoMap extends Mapper<LongWritable, Text, NullWritable, Text> { 19 private Text mapOutputValue = new Text(); 20 @Override 21 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 22 String line = value.toString(); 23 String[] splits = line.split("\t"); 24 25 //1、过滤不合法数据 26 if(splits.length < 9) return; 27 28 //2、去掉&符号左右两边的空格 29 splits[3] = splits[3].replaceAll(" ", ""); 30 StringBuilder sb = new StringBuilder(); 31 32 //3、\t 换成&符号 33 for(int i = 0; i < splits.length; i++){ 34 sb.append(splits[i]); 35 if(i < 9){ 36 if(i != splits.length - 1){ 37 sb.append("\t"); 38 } 39 }else{ 40 if(i != splits.length - 1){ 41 sb.append("&"); 42 } 43 } 44 } 45 46 String newline = sb.toString(); 47 48 mapOutputValue.set(newline); 49 context.write(NullWritable.get(), mapOutputValue); 50 } 51 } 52 public int run(String[] args) throws Exception { 53 54 Configuration conf = this.getConf(); 55 56 Job job=Job.getInstance(conf); 57 job.setJarByClass(VideoMapReduce.class); 58 59 //指定输入数据的目录 60 Path inpath = new Path(args[0]); 61 FileInputFormat.addInputPath(job, inpath); 62 63 //指定数据计算完成后输出的目录 64 Path outpath = new Path(args[1]); 65 FileOutputFormat.setOutputPath(job, outpath); 66 67 //指定调用哪一个map和reduce方法 68 job.setMapperClass(VideoMap.class); 69 70 //指定map输出键值对类型 71 job.setMapOutputKeyClass(NullWritable.class); 72 job.setMapOutputValueClass(Text.class); 73 74 //指定reduce个数 75 job.setNumReduceTasks(0); 76 77 //提交job任务 78 boolean isSuccess =job.waitForCompletion(true); 79 80 return isSuccess ? 0:1; 81 } 82 public static void main(String[] args) throws Exception { 83 84 Configuration configuration = new Configuration(); 85 86 //指定HDFS地址 87 args=new String[]{ 88 "hdfs://beifeng01/user/beifeng01/mapreduce/input/testdata/videoData/video.txt", 89 "hdfs://beifeng01/user/beifeng01/mapreduce/output" 90 }; 91 92 //Run job 93 int status = ToolRunner.run(configuration, new VideoMapReduce(), args); 94 //关闭 95 System.exit(status); 96 } 97 }
mapreduce任务跑完之后可以在输出目录中查看数据是否清洗成功
创建表
这里总共需要创建4张表,明明只有两个数据文件,为什么要创建4张表呢?因为这里创建的表要使用orc的压缩方式,而不使用默认的textfile的方式,orc的压缩方式要想向表中导入数据需要使用子查询的方式导入,即把从另一张表中查询到的数据插入orc压缩格式的表汇中,所以这里需要四张表,两张textfile类型的表user和video,两张orc类型的表user_orc和video_orc
1.先创建textfile类型的表
1 create table video( 2 videoId string, 3 uploader string, 4 age int, 5 category array<string>, 6 length int, 7 views int, 8 rate float, 9 ratings int, 10 comments int, 11 relatedId array<string>) 12 row format delimited 13 fields terminated by "\t" 14 collection items terminated by "&" 15 stored as textfile;
1 create table user( 2 uploader string, 3 videos int, 4 friends int) 5 row format delimited 6 fields terminated by "\t" 7 stored as textfile;
向两张表中导入数据,从hdfs中导入
load data inpath ‘user表所在hdfs中的位置’ into table user;
load data inpath ‘清洗后的vidoe表所在hdfs中的位置’ into table video;
2.创建两张orc类型的表
1 create table video_orc( 2 videoId string, 3 uploader string, 4 age int, 5 category array<string>, 6 length int, 7 views int, 8 rate float, 9 ratings int, 10 comments int, 11 relatedId array<string>) 12 clustered by (uploader) into 8 buckets 13 row format delimited fields terminated by "\t" 14 collection items terminated by "&" 15 stored as orc;
1 create table user_orc( 2 uploader string, 3 videos int, 4 friends int) 5 clustered by (uploader) into 24 buckets 6 row format delimited 7 fields terminated by "\t" 8 stored as orc;
向两张表中导入数据
1 insert into table user_orc select * from user; 2 insert into table video_orc select * from video;
这时候数据就加载到两张表中了,可以进行简单的查看
1 select * from user_orc limit 10; 2 select * from video_orc limit 10;
三 最终业务实现
1.视频观看数 Top10
使用order by做一个全局排序即可
1 select videoId,uploader,views from video_orc order by views desc limit 20;
2. 视频类别热度 Top10
需求分析: 统计出每个类别有多少个视频,然后显示出视频最多的前10个,我们需要使用group by对视频类别进行聚合,然后使用count()进行统计出每个类别视频个数,最后将视频个数进行排序输出前10个,因为一个视频可能对应多个类别,要想使用group by,需要先将类别进行列转行(展开)
1 select 2 category_name as category, 3 count(t.videoId) as hot 4 from ( 5 select 6 videoId, 7 category_name 8 from 9 video_orc lateral view explode(category) t_catetory as category_name) t 10 group by 11 t.category_name 12 order by hot 13 desc limit 10;
explode是将category列展开,例如category列如果是一个集合,集合中是key-value对,展开后就是key一行,value一行,如果表中有多个字段,就要加上lateral view,t_catetory是虚拟表的名,必须有
3.视频观看数 Top20 所属类别包含这 Top20 视频的个数
需求分析: 先找到观看数最高的 20 个视频所属条目的所有信息,降序排列,把这 20 条信息中的 category 分裂出来(列转行),最后查询视频分类名称和该分类下有多少个 Top20 的视频
1 select 2 category_name as category, 3 count(t2.videoId) as hot_with_views 4 from ( 5 select 6 videoId, 7 category_name 8 from ( 9 select 10 * 11 from 12 video_orc 13 order by 14 views 15 desc limit 16 20) t1 lateral view explode(category) t_catetory as category_name) t2 17 group by 18 category_name 19 order by 20 hot_with_views 21 desc;
4.视频观看数 Top50 所关联视频的所属类别的热度排名
需求分析: 查询出观看数最多的前 50 个视频的所有信息(包含了每个视频对应的关联视频),记为临时表 t1,将找到的 50 条视频信息的相关视频的id列转行,记为临时表 t2,将相关视频的 id 和user_orc 表进行 inner join 操作,按照视频类别进行分组,统计每组视频个数,然后排序
1 select 2 category_name as category, 3 count(t5.videoId) as hot 4 from ( 5 select 6 videoId, 7 category_name 8 from ( 9 select 10 distinct(t2.videoId), 11 t3.category 12 from ( 13 select 14 explode(relatedId) as videoId 15 from ( 16 select 17 * 18 from 19 video_orc 20 order by 21 views 22 desc limit 23 50) t1) t2 24 inner join 25 video_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) 26 t_catetory as category_name) t5 27 group by 28 category_name 29 order by 30 hot 31 desc;
5.每个类别中的视频热度 Top10,以Music为例
需求分析: 先将user_orc表中的category(视频类别) 展开,可以创建一张表用于存放视频类别,然后向表中插入数据,最后统计对应类别(Music)中的视频热度
创建表
1 create table test( 2 videoId string, 3 uploader string, 4 age int, 5 categoryId string, 6 length int, 7 views int, 8 rate float, 9 ratings int, 10 comments int, 11 relatedId array<string>) 12 row format delimited 13 fields terminated by "\t" 14 collection items terminated by "&" 15 stored as orc;
插入数据
1 insert into table test 2 select 3 videoId, 4 uploader, 5 age, 6 categoryId, 7 length, 8 views, 9 rate, 10 ratings, 11 comments, 12 relatedId 13 from 14 video_orc lateral view explode(category) catetory as categoryId;
统计Music类别中的视频热度Top10
1 select 2 videoId, 3 views 4 from 5 test 6 where 7 categoryId = "Music" 8 order by 9 views 10 desc limit 11 10;
6. 每个类别中视频流量 Top10,以Music为例
需求分析: 直接在5中创建的表中按照ratings(流量)排序
1 select 2 videoId, 3 views, 4 ratings 5 from 6 test 7 where 8 categoryId = "Music" 9 order by 10 ratings 11 desc limit 12 10;
7.上传视频最多的用户 Top10 以及他们上传的视频
需求分析: 先找到上传视频最多的 10 个用户的用户信息,通过 uploader 字段与 youtube_orc 表进行 join,得到的信息按照 views 观看次数进行排序即可
1 select 2 t2.videoId, 3 t2.views, 4 t2.ratings, 5 t1.videos, 6 t1.friends 7 from ( 8 select 9 * 10 from 11 user_orc 12 order by 13 videos desc 14 limit 15 10) t1 16 join 17 video_orc t2 18 on 19 t1.uploader = t2.uploader 20 order by 21 views desc 22 limit 23 20;
8.每个类别视频观看数 Top10
需求分析: 先得到 categoryId 展开的表数据,子查询按照 categoryId 进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为 rank 列,通过子查询产生的临时表,查询 rank 值小于等于 10 的数据行即可
1 select 2 t1.* 3 from ( 4 select 5 videoId, 6 categoryId, 7 views, 8 row_number() over(partition by categoryId order by views desc) rank from 9 test) t1 10 where 11 rank <= 10;
9.可能出现的问题
JVM堆内存溢出
解决办法: 在 yarn-site.xml 中加入如下代码
<property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>2.1</value> </property> <property> <name>mapred.child.java.opts</name> <value>-Xmx1024m</value> </property>