【Spark Java API】Transformation(2)—sample、randomSplit

sample

官方文档描述:

 Return a sampled subset of this RDD.

返回抽样的样本的子集。

函数原型:

  • withReplacement can elements be sampled multiple times (replaced when sampled out)
  • fraction expected size of the sample as a fraction of this RDD’s size
  • without replacement: probability that each element is chosen; fraction must be [0, 1]
  • with replacement: expected number of times each element is chosen; fraction must be >= 0
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T]
  • withReplacement can elements be sampled multiple times (replaced when sampled out)
  • fraction expected size of the sample as a fraction of this RDD’s size
  • without replacement: probability that each element is chosen; fraction must be [0, 1]
  • with replacement: expected number of times each element is chosen; fraction must be >= 0
  • seed seed for the random number generator
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T]

**
第一函数是基于第二个实现的,在第一个函数中seed为Utils.random.nextLong;其中,withReplacement是建立不同的采样器;fraction为采样比例;seed为随机生成器的种子
**

源码分析:

def sample(withReplacement: Boolean, fraction: Double,    
seed: Long = Utils.random.nextLong): RDD[T] = withScope {  
require(fraction >= 0.0, "Negative fraction value: " + fraction)  
if (withReplacement) {  
  new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)  
} else {    
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed) 
 }
}

**
sample函数中,首先对fraction进行验证;再次建立PartitionwiseSampledRDD,依据withReplacement的值分别建立柏松采样器或伯努利采样器。
**

实例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
//false   是伯努利分布  (元素可以多次采样);0.2   采样比例;100   随机数生成器的种子
JavaRDD<Integer> sampleRDD = javaRDD.sample(false,0.2,100);
System.out.println("sampleRDD~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD.collect());
//true  是柏松分布;0.2   采样比例;100   随机数生成器的种子
JavaRDD<Integer> sampleRDD1 = javaRDD.sample(false,0.2,100);
System.out.println("sampleRDD1~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD1.collect());

randomSplit

官方文档描述:

 Randomly splits this RDD with the provided weights.

依据所提供的权重对该RDD进行随机划分

函数原型:

  • weights for splits, will be normalized if they don’t sum to 1
  • random seed
  • return split RDDs in an array
def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]]
  • weights for splits, will be normalized if they don’t sum to 1
  • return split RDDs in an array
def randomSplit(weights: Array[Double]): Array[JavaRDD[T]]

源码分析:

def randomSplit(weights: Array[Double],    
seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope {  
val sum = weights.sum  
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)  
normalizedCumWeights.sliding(2).map { x =>
  randomSampleWithRange(x(0), x(1), seed) 
 }.toArray
}

def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = { 
this.mapPartitionsWithIndex( { (index, partition) =>    
  val sampler = new BernoulliCellSampler[T](lb, ub)        
  sampler.setSeed(seed + index)    
  sampler.sample(partition)  
 }, preservesPartitioning = true)
}

**
从源码中可以看到randomSPlit先是对权重数组进行0-1正则化;再利用randomSampleWithRange函数,对RDD进行划分;而在该函数中调用mapPartitionsWithIndex(上一节有具体说明),建立伯努利采样器对RDD进行划分。
**

实例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
double [] weights =  {0.1,0.2,0.7};
//依据所提供的权重对该RDD进行随机划分
JavaRDD<Integer> [] randomSplitRDDs = javaRDD.randomSplit(weights);
System.out.println("randomSplitRDDs of size~~~~~~~~~~~~~~" + randomSplitRDDs.length);
int i = 0;
for(JavaRDD<Integer> item:randomSplitRDDs)        
   System.out.println(i++ + " randomSplitRDDs of item~~~~~~~~~~~~~~~~" + item.collect());
    原文作者:小飞_侠_kobe
    原文地址: https://www.jianshu.com/p/abe1755220b2
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞