什么是 Join
Join,翻译过来是 加入、连接、结合的意思。
而在数据处理中,join 是对表的操作。表是数据存储的一种形式,就像 excel 中的表一样。
我们为了想得到想要的结果,需要分析多张表,而 把两张 或更多的表 进行结合,这样的操作 就叫 Join。
那 在 MapReduce 中的 Join 就是指上面的操作,只不过可能不是处理的表,而是文件,或者是从表存储的介质 比如 MySql、Hbase 中读取的数据。
举个 MapReduce 中 使用 Join 的例子:
比如我们有两个文件,分别存储 订单信息:products.txt,和 商品信息:orders.txt ,详细数据如下:
products.txt:
//商品ID,商品名称,商品类型(数字表示,我们假设有一个数字和具体类型的映射)
p0001,xiaomi,001
p0002,chuizi,001
orders.txt:
//订单号,时间,商品id,购买数量
1001,20170710,p0001,1
1002,20170710,p0001,3
1003,20170710,p0001,3
1004,20170710,p0002,1
我们想象有多个商品,并有海量的订单信息,并且存储在多个 HDFS 块中。
如果我们想统计 每个商品的 购买数量,即这样的形式:
xiaomi,7
chuizi,1
该怎么处理?
我们分析上面我们想要的结果,商品名称和销量,这两个属性分别存放到不同的文件中,那我们就要考虑 在一个地方(mapper)读取这两个文件的数据,并把数据在一个地方(reducer)进行结合。这就是 MapReduce 中的 Join 了。
我们用代码实现上面的过程 (只写出最主要的代码):
Mapper:
Text outKey = new Text();
Text outValue = new Text();
StringBuilder sb = new StringBuilder();
protected void map {
String[] split = value.toString().split(",");
//两个文件 在一个 mapper 中处理
if(name.equals("products.txt")) {
//取商品ID 作为 输出key 和 商品名称 作为 输出value,即 第0、1 的数据
outKey.set(split[0]);
outValue.set("product#" + split[1]);
context.write(outKey, outValue);
} else {
//取商品ID 作为 输出key 和 购买数量 作为 输出value,即 第2、3 的数据
outKey.set(split[2]);
outValue.set("order#" + split[3]);
context.write(outKey, outValue);
}
}
Reducer:
//用来存放:商品ID、商品名称
List<String> productsList = new ArrayList<>();
//用来存放:商品ID、购买数量
List<String> ordersList = new ArrayList<>();
Text outValue = new Text();
protected void reduce {
for (Text text : values) {
String value = text.toString();
if(value.startsWith("product#")) {
productsList.add(value.split("#")[1]); //取出 商品名称
} else if(value.startsWith("order#")){
ordersList.add(text.toString().split("#")[1]); //取出商品的销量
}
}
int totalOrders = 0;
for (int i=0; i < productsList.size(); i++) {
for (int j=0; j < ordersList.size(); j++) {
totalOrders += ordersList.get(j);
}
outValue.set(productsList.get(i) + "\t" + totalOrders );
//最后的输出是:商品ID、商品名称、购买数量
context.write(key, outValue);
}
}
上面的代码即是 Join 的过程。
下面我们说下 Map Join。
什么是 Map Join
Map Join 是指 Join 发生在 MapReduce 的 Map 阶段,而我们通常在 MapReduce 中把两张或多个表结合,是在 Reduce 端处理。
为什么要使用 Map Join
我们假设 某些商品卖的特别好,比如小米手机,产生了大量的订单,数据量特别大;而某些商品销量惨淡,比如锤子手机。那么某一个 Reduce Task 处理了大量数据,某个 Reduce Task 可能只处理几条数据,即产生了数据倾斜的问题。
而 我们上面举的例子,商品相对于订单 的数据量来说,是非常小的,可能一个商城 有几百种商品,而订单量可能达到上千万。
而 Map Join,适用于: 合并两个表数据,但有数据倾斜的问题,且一张表的数据量很小,另一张表数据量很大的情况。
如何实现 Map Join
我们可以通过 DistributedCache 来实现。
DistributedCache 是一个提供给Map/Reduce框架的工具,用来缓存指定的文件到 每一个 slave 节点上。
我们通过 DistributedCache 把小文件发给每一个 Mapper,在 每一个 Mapper 中实现 上面 Reducer 的功能,这样发送到 Reducer 中的数据已经是聚合过的数据,数据量大大减少,也就解决了数据倾斜的问题。
代码流程:
- 在 Main 方法中调用 DistributedCache 的Api 把小文件保存起来。
- 在 Mapper 的 setup() 中通过 DistributedCache 的 Api 来获取小文件,并保存到 HashMap 中。
Mapper 中有一个 setup() 方法,在 map() 之前执行,通常做一些初始化工作。 - 在 map() 中 从HasnMap 中读取数据,并读取大文件,把两个文件进行合并。
- 在 Reducer 中进一步处理。
看一下具体代码:
Main:
public static void main(String[] args) {
...
DistributedCache.addCacheFile(new Path("...").toUri() , conf);
}
Mapper 中的 setup():
private static Map<String,String> productMap = new HashMap<>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//Path[] files = context.getLocalCacheFiles();
Path[] files = DistributedCache.getLocalCacheFiles(conf);
String strPath = files[0].toUri().toString();
BufferedReader br = new BufferedReader(new FileReader(strPath));
while((readLine = br.readLine()) != null) {
String[] split = readLine.split(",");
String productId = split[0];
String productName = split[1];
productMap.put(productId, productName);
}
}
Mapper 中的 map():
@Override
protected void map {
//读取的 orders.txt 中的数据
String[] split = value.toString().split(",");
String productId = split[2];
String saleSum = split[3];
String productName = productMap.get(productId);
outKey.set(productId);
outValue.set(productName + "\t" + saleSum);
context.write(outKey, outValue);
}
这样就完成了。