java – 带有递归Map的Hadoop MapReduce

我需要在
Java中执行一个需要自动递归的MapReduce应用程序,这意味着对于处理的每行输入文件,它必须检查条件的输入/映射条目的所有行,并由函数验证.或者,换句话说,Reducer应该为每个接收的对(密钥,值)调用/读取所有Map.

在Hadoop框架上实现此功能的最佳方法是什么?

我可以通过读取输入n次或将输入加载到hashmap中以编程方式执行此操作,但我认为可以在MapReduce范例中完成所有操作.

感谢您的帮助/提示!

编辑:更多细节,我有(作为其他工作的结果)一个问题空间的分区列表(索引,计数),并希望作为输出(索引,sumOfNearestNeighborsCounts),所以对于每个索引,我想再次访问地图,并为每个NearestNeighbor索引总和出现次数. (另见Costi Ciudatu评论) 最佳答案 对于每个索引键,您需要发出所有可能的邻居索引(您应该能够以数学方式生成).

那么,让我们来看一个简单的(线性)例子.你有一个{I1,I2,I3,I4}的一维空间.邻居将简单地表示“前一个或下一个元素”:I1与I2相邻但不与I3相邻.

对于每个到达映射器的索引,为该索引的每个可能的邻居发出一个键(包括它自己! – 我们将定义每个索引都是其自身的可能邻居,但是对于计数具有特殊且荒谬的负值,我’我会解释原因):

<I1, count(I1)> -> <I0, count(I1)>
                -> <I1, -1>
                -> <I2, count(I1)>

<I2, count(I2)> -> <I1, count(I2)>
                -> <I2, -1>
                -> <I3, count(I2)>

现在在reducer中,您将获得每个键的以下值:

I0: [ count(I1) ]
I1: [ count(I2), -1 ]
I2: [ count(I1), -1, count(I3) ]
...

在你的reducer中,迭代邻居的所有值,如下所示:

boolean doesExist = false;
int sum = 0;
for (IntWritable value : values) {
    int count = value.get();
    if (count < 0) {
        doesExist = true;
    } else {
        sum += count;
    }
}
if (doesExist) {
    context.write(key, new IntWritable(sum));
}

这样你将排除(在上面的例子中)I0和I4,它们不存在,并且它们的列表中没有负值.

现在,为了更接近您的用例,如果您在迭代期间也需要实际索引值(而不仅仅是所有邻居的计数),您可以执行以下操作:

而不是从映射器中发出简单数字,而是输出一些包含索引及其计数的包装器bean.通过这种方式,您可以根据某些业务约束或其他方式排除某些邻居,但是您将始终只使用每个给定索引的(可能的)邻居列表,而不是整个输入集:

<I1, count(I1)> -> <I0, {I1, count(I1)}>
                -> <I1, {I1, count(I1)}>
                -> <I2, {I1, count(I1)}>
... and so on

现在,在减速机中你会得到:

I0: [ {I1, count(I1)} ]
I1: [ {I1, count(I1)}, {I2, count(I2)} ]
I2: [ {I1, count(I1)}, {I2, count(I2)}, {I3, count(I3)} ]

您可以注意到,您不再需要人工-1计数,因为对于doExist测试,您现在可以检查值列表中的任何包装bean是否与键索引具有相同的索引.

即使可能的邻居数量随着维度的数量呈指数增长(正如您已经提到的),我会说这种方法仍然比读取每个键/值对的整个输入要好得多,并且它更适合于map / reduce范例.

点赞