MapReduce Map端 join 的一个例子

什么是 Join

Join,翻译过来是 加入、连接、结合的意思。
而在数据处理中,join 是对表的操作。表是数据存储的一种形式,就像 excel 中的表一样。
我们为了想得到想要的结果,需要分析多张表,而 把两张 或更多的表 进行结合,这样的操作 就叫 Join。

那 在 MapReduce 中的 Join 就是指上面的操作,只不过可能不是处理的表,而是文件,或者是从表存储的介质 比如 MySql、Hbase 中读取的数据。

举个 MapReduce 中 使用 Join 的例子:

比如我们有两个文件,分别存储 订单信息:products.txt,和 商品信息:orders.txt ,详细数据如下:

products.txt:

//商品ID,商品名称,商品类型(数字表示,我们假设有一个数字和具体类型的映射)
p0001,xiaomi,001
p0002,chuizi,001

orders.txt:

//订单号,时间,商品id,购买数量 
1001,20170710,p0001,1 
1002,20170710,p0001,3 
1003,20170710,p0001,3 
1004,20170710,p0002,1

我们想象有多个商品,并有海量的订单信息,并且存储在多个 HDFS 块中。

如果我们想统计 每个商品的 购买数量,即这样的形式:

xiaomi,7
chuizi,1

该怎么处理?

我们分析上面我们想要的结果,商品名称和销量,这两个属性分别存放到不同的文件中,那我们就要考虑 在一个地方(mapper)读取这两个文件的数据,并把数据在一个地方(reducer)进行结合。这就是 MapReduce 中的 Join 了。

我们用代码实现上面的过程 (只写出最主要的代码):
Mapper:

Text outKey = new Text();
Text outValue = new Text();
StringBuilder sb = new StringBuilder();

protected void map {
    String[] split = value.toString().split(",");
    
    //两个文件 在一个 mapper 中处理
    if(name.equals("products.txt")) {
    
        //取商品ID 作为 输出key 和 商品名称 作为 输出value,即 第0、1 的数据
        outKey.set(split[0]);
        outValue.set("product#" + split[1]);
        context.write(outKey, outValue);
        
    } else {
        //取商品ID 作为 输出key 和 购买数量 作为 输出value,即 第2、3 的数据
        outKey.set(split[2]);
        outValue.set("order#" + split[3]);
        context.write(outKey, outValue);
    }
}

Reducer:

//用来存放:商品ID、商品名称
List<String> productsList = new ArrayList<>();

//用来存放:商品ID、购买数量
List<String> ordersList = new ArrayList<>();

Text outValue = new Text();

protected void reduce {

    for (Text text : values) {
        String value = text.toString();
        
        if(value.startsWith("product#")) {
            productsList.add(value.split("#")[1]); //取出 商品名称
            
        } else if(value.startsWith("order#")){
            ordersList.add(text.toString().split("#")[1]); //取出商品的销量
        }
    }
    int totalOrders = 0;
    for (int i=0; i < productsList.size(); i++) {
        for (int j=0; j < ordersList.size(); j++) {
            totalOrders += ordersList.get(j);
        }
        outValue.set(productsList.get(i) + "\t" + totalOrders );
        //最后的输出是:商品ID、商品名称、购买数量
        context.write(key, outValue);
    }
}

上面的代码即是 Join 的过程。

下面我们说下 Map Join。

什么是 Map Join

Map Join 是指 Join 发生在 MapReduce 的 Map 阶段,而我们通常在 MapReduce 中把两张或多个表结合,是在 Reduce 端处理。

为什么要使用 Map Join

我们假设 某些商品卖的特别好,比如小米手机,产生了大量的订单,数据量特别大;而某些商品销量惨淡,比如锤子手机。那么某一个 Reduce Task 处理了大量数据,某个 Reduce Task 可能只处理几条数据,即产生了数据倾斜的问题。

而 我们上面举的例子,商品相对于订单 的数据量来说,是非常小的,可能一个商城 有几百种商品,而订单量可能达到上千万。

而 Map Join,适用于: 合并两个表数据,但有数据倾斜的问题,且一张表的数据量很小,另一张表数据量很大的情况。

如何实现 Map Join

我们可以通过 DistributedCache 来实现。
DistributedCache 是一个提供给Map/Reduce框架的工具,用来缓存指定的文件到 每一个 slave 节点上。

我们通过 DistributedCache 把小文件发给每一个 Mapper,在 每一个 Mapper 中实现 上面 Reducer 的功能,这样发送到 Reducer 中的数据已经是聚合过的数据,数据量大大减少,也就解决了数据倾斜的问题。

代码流程:

  1. 在 Main 方法中调用 DistributedCache 的Api 把小文件保存起来。
  2. 在 Mapper 的 setup() 中通过 DistributedCache 的 Api 来获取小文件,并保存到 HashMap 中。
    Mapper 中有一个 setup() 方法,在 map() 之前执行,通常做一些初始化工作。
  3. 在 map() 中 从HasnMap 中读取数据,并读取大文件,把两个文件进行合并。
  4. 在 Reducer 中进一步处理。
看一下具体代码:

Main:

public static void main(String[] args) {
    ...
    DistributedCache.addCacheFile(new Path("...").toUri() , conf);
}

Mapper 中的 setup():

private static Map<String,String> productMap =  new HashMap<>();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
    //Path[] files = context.getLocalCacheFiles();
    Path[] files = DistributedCache.getLocalCacheFiles(conf);
    
    String strPath = files[0].toUri().toString();
    
    BufferedReader br = new BufferedReader(new FileReader(strPath));
    
    while((readLine = br.readLine()) != null) {
        String[] split = readLine.split(",");
        String productId = split[0];
        String productName = split[1];

        productMap.put(productId, productName);
    }
}

Mapper 中的 map():

@Override
protected void map {
    //读取的 orders.txt 中的数据
    String[] split = value.toString().split(",");

    String productId = split[2];
    String saleSum = split[3];

    String productName = productMap.get(productId);

    outKey.set(productId);
    outValue.set(productName + "\t" + saleSum);

    context.write(outKey, outValue);
}

这样就完成了。

    原文作者:博弈史密斯
    原文地址: https://www.jianshu.com/p/432a7a51d74a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞