Kmeans(spark-mllib2.2.0最新源码解析)

前言:基础的机器学习算法,可能在神经网络,深度学习的浪潮所淹没,但是本文重点在于工程实践,重点在于分布式的实现理解,如何在在于通用架构中(spark\Hadoop生态系统)去实现有价值的系统或者算法。

K 均值算法:就是聚成K类,那么到底聚成几类?,目前这个K是由用户输入的。

如何找到这个K个聚类中心? 随机 or 通过一定的策略算法。

如何分布式计算,迭代更新聚类中心?

#算法调用的入口:

private def runAlgorithm(
      data: RDD[VectorWithNorm],
      instr: Option[Instrumentation]): KMeansModel = {

    val sc = data.sparkContext

    val initStartTime = System.nanoTime()

    val distanceMeasureInstance = DistanceMeasure.decodeFromString(this.distanceMeasure)

    val centers = initialModel match {
      case Some(kMeansCenters) =>
        kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
      case None =>
        if (initializationMode == KMeans.RANDOM) {
          initRandom(data)
        } else {
          initKMeansParallel(data, distanceMeasureInstance)
        }
    }
    val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
    logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")

    var converged = false
    var cost = 0.0
    var iteration = 0

    val iterationStartTime = System.nanoTime()

    instr.foreach(_.logNumFeatures(centers.head.vector.size))

    // Execute iterations of Lloyd's algorithm until converged
    while (iteration < maxIterations && !converged) {
      // double累计器
      val costAccum = sc.doubleAccumulator
      val bcCenters = sc.broadcast(centers)

      // Find the new centers
      val collected = data.mapPartitions { points =>
        val thisCenters = bcCenters.value
        val dims = thisCenters.head.vector.size

        val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
        val counts = Array.fill(thisCenters.length)(0L)

        points.foreach { point =>
          val (bestCenter, cost) = distanceMeasureInstance.findClosest(thisCenters, point)
          costAccum.add(cost)
          distanceMeasureInstance.updateClusterSum(point, sums(bestCenter))
          counts(bestCenter) += 1
        }

        //mapPartitions 里面的function必须是iterator[T]
        //reduceByKey(f) 里面f是对value的操作
        counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
      }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
        axpy(1.0, sum2, sum1)
        (sum1, count1 + count2)
      }.collectAsMap()

      //collected结构(j,(sum, count))

      if (iteration == 0) {
        instr.foreach(_.logNumExamples(collected.values.map(_._2).sum))
      }

      //根据聚类中心所有点的平均值为新的聚类中心  (Vector1+Vector2+....+VectorN)/N
      // mapValues: 对键值对每个value都应用一个函数,但是,key不会发生变化。
      val newCenters = collected.mapValues { case (sum, count) =>
        distanceMeasureInstance.centroid(sum, count)
      }

      bcCenters.destroy(blocking = false)

      // Update the cluster centers and costs
      converged = true
      newCenters.foreach { case (j, newCenter) =>
        if (converged &&
          !distanceMeasureInstance.isCenterConverged(centers(j), newCenter, epsilon)) {
          converged = false //所有的聚类中心满足收敛条件,即收敛
        }
        centers(j) = newCenter
      }

      cost = costAccum.value
      iteration += 1
    }

    val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
    logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")

    if (iteration == maxIterations) {
      logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
    } else {
      logInfo(s"KMeans converged in $iteration iterations.")
    }

    logInfo(s"The cost is $cost.")

    new KMeansModel(centers.map(_.vector), distanceMeasure, cost, iteration)
  }

聚类中心初始化的选择:

# 第一种方法:随机选择
private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
    // Select without replacement; may still produce duplicates if the data has < k distinct
    // points, so deduplicate the centroids to match the behavior of k-means|| in the same situation
    data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt())
      .map(_.vector).distinct.map(new VectorWithNorm(_))
  }

#第二种方法:kmean++
private[clustering] def initKMeansParallel(data: RDD[VectorWithNorm],
      distanceMeasureInstance: DistanceMeasure): Array[VectorWithNorm] = {
    // Initialize empty centers and point costs.
    //初始化代价值均为正无穷
    var costs = data.map(_ => Double.PositiveInfinity)

    // Initialize the first center to a random point.
    //随机选择一个点作为聚类中心点
    val seed = new XORShiftRandom(this.seed).nextInt()
    val sample = data.takeSample(false, 1, seed) //返回结构:Array[VectorWithNorm]

    // Could be empty if data is empty; fail with a better message early:
    require(sample.nonEmpty, s"No samples available from $data")

    val centers = ArrayBuffer[VectorWithNorm]()
    var newCenters = Seq(sample.head.toDense)
    // += 添加一个元素,++= 添加其他集合中的所有元素
    // 下面等同于 centers += sample.head.toDense
    centers ++= newCenters


    // On each step, sample 2 * k points on average with probability proportional
    // to their squared distance from the centers. Note that only distances between points
    // and new centers are computed in each iteration.
    var step = 0
    val bcNewCentersList = ArrayBuffer[Broadcast[_]]()
    while (step < initializationSteps) {
      val bcNewCenters = data.context.broadcast(newCenters)
      bcNewCentersList += bcNewCenters
      val preCosts = costs
      costs = data.zip(preCosts).map { case (point, cost) =>
        math.min(distanceMeasureInstance.pointCost(bcNewCenters.value, point), cost)
      }.persist(StorageLevel.MEMORY_AND_DISK)
      val sumCosts = costs.sum()

      //取消缓存:消除内存、disk中的存储
      bcNewCenters.unpersist(blocking = false)
      preCosts.unpersist(blocking = false)

      // index是分区索引
      val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointCosts) =>
        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
        pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k / sumCosts }.map(_._1)
      }.collect()
      newCenters = chosen.map(_.toDense)
      centers ++= newCenters
      step += 1
    }

    //清除无用的缓存对象和销毁内存中无用的对象
    costs.unpersist(blocking = false)
    bcNewCentersList.foreach(_.destroy(false))

    //根据Vector(不包括norm)去除重复中心点,并重新构造为(Vecotor, norm)
    val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_))

    if (distinctCenters.size <= k) {
      distinctCenters.toArray
    } else {
      // Finally, we might have a set of more than k distinct candidate centers; weight each
      // candidate by the number of points in the dataset mapping to it and run a local k-means++
      // on the weighted centers to pick k of them
      val bcCenters = data.context.broadcast(distinctCenters)
      val countMap = data
        .map(distanceMeasureInstance.findClosest(bcCenters.value, _)._1)
        .countByValue()

      bcCenters.destroy(blocking = false)

      val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray
      //30是迭代次数
      LocalKMeans.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30)
    }
  }
}

总结:本文属于个人学习灌水,不会一行行给与解释,当时对于关键算子和基础的思想部分都进行了注释,方便水友相互学习。 那么回答最初的一个疑问,分布式实现的关键在哪里?

关键一:通过广播,将聚类中心不断更新广播到哥哥executor, 以便各个执行节点拿到最新的聚类中心

val bcCenters = sc.broadcast(centers)

关键二:利用double累加器(2.0最新的累加器实现)将各个分区的cost累加起来,实现全局求和

val costAccum = sc.doubleAccumulator

关键三:利用collect将各个分区的结果计算完成后,拉取到driver端进行聚类中心的更新,同时回到关键一,最新聚类中心广播出去

val collected= data.mapPartitions…reduceByKey….collectAsMap()

spark目前实现分布式算法,基本上都是基于上面的三种思路和方法做的,大家可以关注FTRL在线学习算法等工程实现,都是使用上面的思路和工程手段。 如何大家有更好的方法,也可以留言,以供相互共勉学习。

最后,关于如何使用spark-mlib里面的算法,我以前发过文章,或者关注我的码云代码库:

xuboyao/sparkmllibgitee.com

里面都有例子和相关数据,可以直接运行。

    原文作者:XuLy
    原文地址: https://zhuanlan.zhihu.com/p/44903923
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞