Spark自定义分区(Partitioner)

基于优化和数据的有序性等问题考虑,某个设备的日志数据分到指定的计算节点,减少数据的网络传输

我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略。为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法:

01packageorg.apache.spark

02 

03/**

04 * An object that defines how the elements in a key-value pair RDD are partitioned by key.

05 * Maps each key to a partition ID, from 0 to `numPartitions – 1`.

06 */

07abstractclassPartitioner extendsSerializable {

08  defnumPartitions:Int

09  defgetPartition(key:Any):Int

10}

def numPartitions: Int:这个方法需要返回你想要创建分区的个数;

def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;

equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。

假如我们想把来自同一个域名的URL放到一台节点上,比如:http://www.iteblog.comhttp://www.iteblog.com/archives/1368,如果你使用HashPartitioner,这两个URL的Hash值可能不一样,这就使得这两个URL被放到不同的节点上。所以这种情况下我们就需要自定义我们的分区策略,可以如下实现:

01packagecom.iteblog.utils

02 

03importorg.apache.spark.Partitioner

04 

05/**

06 * User: 过往记忆

07 * Date: 2015-05-21

08 * Time: 下午23:34

09 * bolg:http://www.iteblog.com

10 * 本文地址:http://www.iteblog.com/archives/1368

11 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

12 * 过往记忆博客微信公共帐号:iteblog_hadoop

13 */

14 

15classIteblogPartitioner(numParts:Int) extendsPartitioner {

16  overridedefnumPartitions:Int =numParts

17 

18  overridedefgetPartition(key:Any):Int ={

19    valdomain =newjava.net.URL(key.toString).getHost()

20    valcode =(domain.hashCode %numPartitions)

21    if(code < 0) {

22      code + numPartitions

23    } else{

24      code

25    }

26  }

27 

28  overridedefequals(other:Any):Boolean =other match{

29    caseiteblog:IteblogPartitioner =>

30      iteblog.numPartitions ==numPartitions

31    case_=>

32      false

33  }

34 

35  overridedefhashCode:Int =numPartitions

36}

因为hashCode值可能为负数,所以我们需要对他进行处理。然后我们就可以在partitionBy()方法里面使用我们的分区:

1iteblog.partitionBy(newIteblogPartitioner(20))

类似的,在Java中定义自己的分区策略和Scala类似,只需要继承org.apache.spark.Partitioner,并实现其中的方法即可。

在Python中,你不需要扩展Partitioner类,我们只需要对iteblog.partitionBy()加上一个额外的hash函数,如下:

查看源代码

打印帮助

1importurlparse

3defiteblog_domain(url):

4  returnhash(urlparse.urlparse(url).netloc)

6iteblog.partitionBy(20, iteblog_domain)

转自:https://blog.csdn.net/xiao_jun_0820/article/details/45913745

    原文作者:达微
    原文地址: https://www.jianshu.com/p/017cc886ba8e
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞