前言:基础的机器学习算法,可能在神经网络,深度学习的浪潮所淹没,但是本文重点在于工程实践,重点在于分布式的实现理解,如何在在于通用架构中(spark\Hadoop生态系统)去实现有价值的系统或者算法。
K 均值算法:就是聚成K类,那么到底聚成几类?,目前这个K是由用户输入的。
如何找到这个K个聚类中心? 随机 or 通过一定的策略算法。
如何分布式计算,迭代更新聚类中心?
#算法调用的入口:
private def runAlgorithm(
data: RDD[VectorWithNorm],
instr: Option[Instrumentation]): KMeansModel = {
val sc = data.sparkContext
val initStartTime = System.nanoTime()
val distanceMeasureInstance = DistanceMeasure.decodeFromString(this.distanceMeasure)
val centers = initialModel match {
case Some(kMeansCenters) =>
kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
case None =>
if (initializationMode == KMeans.RANDOM) {
initRandom(data)
} else {
initKMeansParallel(data, distanceMeasureInstance)
}
}
val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")
var converged = false
var cost = 0.0
var iteration = 0
val iterationStartTime = System.nanoTime()
instr.foreach(_.logNumFeatures(centers.head.vector.size))
// Execute iterations of Lloyd's algorithm until converged
while (iteration < maxIterations && !converged) {
// double累计器
val costAccum = sc.doubleAccumulator
val bcCenters = sc.broadcast(centers)
// Find the new centers
val collected = data.mapPartitions { points =>
val thisCenters = bcCenters.value
val dims = thisCenters.head.vector.size
val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
val counts = Array.fill(thisCenters.length)(0L)
points.foreach { point =>
val (bestCenter, cost) = distanceMeasureInstance.findClosest(thisCenters, point)
costAccum.add(cost)
distanceMeasureInstance.updateClusterSum(point, sums(bestCenter))
counts(bestCenter) += 1
}
//mapPartitions 里面的function必须是iterator[T]
//reduceByKey(f) 里面f是对value的操作
counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
}.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
axpy(1.0, sum2, sum1)
(sum1, count1 + count2)
}.collectAsMap()
//collected结构(j,(sum, count))
if (iteration == 0) {
instr.foreach(_.logNumExamples(collected.values.map(_._2).sum))
}
//根据聚类中心所有点的平均值为新的聚类中心 (Vector1+Vector2+....+VectorN)/N
// mapValues: 对键值对每个value都应用一个函数,但是,key不会发生变化。
val newCenters = collected.mapValues { case (sum, count) =>
distanceMeasureInstance.centroid(sum, count)
}
bcCenters.destroy(blocking = false)
// Update the cluster centers and costs
converged = true
newCenters.foreach { case (j, newCenter) =>
if (converged &&
!distanceMeasureInstance.isCenterConverged(centers(j), newCenter, epsilon)) {
converged = false //所有的聚类中心满足收敛条件,即收敛
}
centers(j) = newCenter
}
cost = costAccum.value
iteration += 1
}
val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")
if (iteration == maxIterations) {
logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
} else {
logInfo(s"KMeans converged in $iteration iterations.")
}
logInfo(s"The cost is $cost.")
new KMeansModel(centers.map(_.vector), distanceMeasure, cost, iteration)
}
聚类中心初始化的选择:
# 第一种方法:随机选择
private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
// Select without replacement; may still produce duplicates if the data has < k distinct
// points, so deduplicate the centroids to match the behavior of k-means|| in the same situation
data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt())
.map(_.vector).distinct.map(new VectorWithNorm(_))
}
#第二种方法:kmean++
private[clustering] def initKMeansParallel(data: RDD[VectorWithNorm],
distanceMeasureInstance: DistanceMeasure): Array[VectorWithNorm] = {
// Initialize empty centers and point costs.
//初始化代价值均为正无穷
var costs = data.map(_ => Double.PositiveInfinity)
// Initialize the first center to a random point.
//随机选择一个点作为聚类中心点
val seed = new XORShiftRandom(this.seed).nextInt()
val sample = data.takeSample(false, 1, seed) //返回结构:Array[VectorWithNorm]
// Could be empty if data is empty; fail with a better message early:
require(sample.nonEmpty, s"No samples available from $data")
val centers = ArrayBuffer[VectorWithNorm]()
var newCenters = Seq(sample.head.toDense)
// += 添加一个元素,++= 添加其他集合中的所有元素
// 下面等同于 centers += sample.head.toDense
centers ++= newCenters
// On each step, sample 2 * k points on average with probability proportional
// to their squared distance from the centers. Note that only distances between points
// and new centers are computed in each iteration.
var step = 0
val bcNewCentersList = ArrayBuffer[Broadcast[_]]()
while (step < initializationSteps) {
val bcNewCenters = data.context.broadcast(newCenters)
bcNewCentersList += bcNewCenters
val preCosts = costs
costs = data.zip(preCosts).map { case (point, cost) =>
math.min(distanceMeasureInstance.pointCost(bcNewCenters.value, point), cost)
}.persist(StorageLevel.MEMORY_AND_DISK)
val sumCosts = costs.sum()
//取消缓存:消除内存、disk中的存储
bcNewCenters.unpersist(blocking = false)
preCosts.unpersist(blocking = false)
// index是分区索引
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k / sumCosts }.map(_._1)
}.collect()
newCenters = chosen.map(_.toDense)
centers ++= newCenters
step += 1
}
//清除无用的缓存对象和销毁内存中无用的对象
costs.unpersist(blocking = false)
bcNewCentersList.foreach(_.destroy(false))
//根据Vector(不包括norm)去除重复中心点,并重新构造为(Vecotor, norm)
val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_))
if (distinctCenters.size <= k) {
distinctCenters.toArray
} else {
// Finally, we might have a set of more than k distinct candidate centers; weight each
// candidate by the number of points in the dataset mapping to it and run a local k-means++
// on the weighted centers to pick k of them
val bcCenters = data.context.broadcast(distinctCenters)
val countMap = data
.map(distanceMeasureInstance.findClosest(bcCenters.value, _)._1)
.countByValue()
bcCenters.destroy(blocking = false)
val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray
//30是迭代次数
LocalKMeans.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30)
}
}
}
总结:本文属于个人学习灌水,不会一行行给与解释,当时对于关键算子和基础的思想部分都进行了注释,方便水友相互学习。 那么回答最初的一个疑问,分布式实现的关键在哪里?
关键一:通过广播,将聚类中心不断更新广播到哥哥executor, 以便各个执行节点拿到最新的聚类中心
val bcCenters = sc.broadcast(centers)
关键二:利用double累加器(2.0最新的累加器实现)将各个分区的cost累加起来,实现全局求和
val costAccum = sc.doubleAccumulator
关键三:利用collect将各个分区的结果计算完成后,拉取到driver端进行聚类中心的更新,同时回到关键一,最新聚类中心广播出去
val collected= data.mapPartitions…reduceByKey….collectAsMap()
spark目前实现分布式算法,基本上都是基于上面的三种思路和方法做的,大家可以关注FTRL在线学习算法等工程实现,都是使用上面的思路和工程手段。 如何大家有更好的方法,也可以留言,以供相互共勉学习。
最后,关于如何使用spark-mlib里面的算法,我以前发过文章,或者关注我的码云代码库:
里面都有例子和相关数据,可以直接运行。