原创作品,转载请标明:https://blog.csdn.net/Xiejingfa/article/details/79936737
最近在学习Spark ML的相关知识,今天给大家带来一篇K-means聚类算法的源码分析。
1、KMeans算法及其改进
K-means是机器学习中一个比较经典的聚类算法,所谓聚类就是将一个数据集划分为k个子集,使得每个子集中的样本尽可能相似(高相似度),而不同子集中的样本尽可能不同(低相似度)。这里的每个子集又称为“簇”。
K-means算法的主要步骤分为:
- 从数据集中随机选取k个样本点作为初始质心(聚类中心点);
- 分别计算数据集中每个样本点到k个质心的距离并将其划分到距离最近的质心所对应的类中,形成k个簇;
- 更新计算每个簇的质心;
- 重复步骤2和步骤3直到每个簇的质心不再发生变化或达到最大迭代次数为止。
K-means算法中一般使用欧式距离作为相似度的度量。
我们暂且称上面这种方法为“经典K-means”算法。
经典K-means算法虽然简单快速,但存在两个主要的缺点:
- 需要预先指定k值:现实中有很多场景下很难确定k的初始值。我们并不知道对于给定的数据集应该分为几个类别才合适,这种情况下需要通过不断的迭代运算来估计k值;
- 对初始选取的质心比较敏感:初始选取的质心不同很可能导致完全不同的聚类结果。
为了解决经典K-means的第二个缺点,David Arthur和Sergei Vassilvitskii在2007年提出了K-means++算法(维基百科K-means++)。
K-means++算法可以理解为初始选择的k个质心之间的相互距离应该尽可能地远。该算法的步骤如下:
- 从输入的数据集中随机选择一个样本点作为第一个质心;
- 对于数据集中的每一个样本点分别计算它与当前已有质心之间的最短距离,记为D(x);
- 选择一个新的数据点作为新的质心,选择的原则是:D(x)越大的点,被选取作为质心的的概率也越大;
- 重复步骤2和步骤3直到找到k个质心。
K-means++算法的关键在于步骤2,如何才能让D(x)越大的点被选取作为质心的的概率也越大?一种常用的做法是:将所有D(x)保存在一个数组Dist中,计算所有距离之和Sum(D(x)),然后再取一个落在Sum(D(x))中的随机值r,最后用这个随机值R依次减去数组Dist中的元素直至R <= 0。大家可以把这个过程想象成一个轮盘,每一个D(x)表示轮盘上的一个扇形区域,扇形区域的面积越大,则东西掉落在这个区域的概率也越大。
K-means++虽然提供了一种比较好的选取初始质心的方法,但其扩展性又不尽人意,原因在于K-means++在选择当前质心时必须依赖于前面得到的所有质心,即k-means必须顺序执行,想要得到k个质心就要遍历k次数据集。这也导致了k-means算法扩展性差、没办法并行地扩展。
针对K-means++的上述缺点,K-means||提出了另一种解决方案。
K-means||算法是在K-means++算法的基础上改进了取样策略,不同于k-means++算法每次遍历只取一个样本点,k-means||算法每次遍历时也按照一定的概率取 O( k) 个样本,并重复该取样过程大约 O(logn) 次。这样取样得到的样本点的数量将远远小于输入数据集的数量,最后在本地利用k-means++算法将这些样本点聚类为k个初始质心。
K-means||算法具体可以参看论文:Scalable K-means++
在下文中我们可以看到,Spark中提供了经典KMeans和KMeans||两种方法的实现。两者的主要区别在于初始质心的选择不同。
2、Spark中的 K-means
Spark中的K-means算法主要由以下三个文件实现:
- org.apache.spark.mllib.clustering.KMeans(暂且称为ml.KMeans)
- org.apache.spark.mllib.clustering.LocalKMeans(暂且称为mllib.KMeans)
- org.apache.spark.ml.clustering.KMeans
其中mllib.KMeans实现了经典K-means算法和K-means||算法;LocalKMeans实现了本地的K-means++算法,用于将K-means||选取的候选质心聚类为k个初始质心;而ml.KMeans则是对mllib.KMeans的封装。
关键参数:
k(K值,聚类个数)
The number of clusters to create (k). Must be > 1. Note that it is possible for fewer than k clusters to be returned, for example, if there are fewer than k distinct points to cluster.
initMode(质心初始化模式,对应mllib.KMeans中的initializationMode)
支持随机选择模式和K-means||模式。
Param for the initialization algorithm. This can be either “random” to choose random points as initial cluster centers, or “k-means||” to use a parallel variant of k-means++
initSteps(K-means||取样次数,对应mllib.KMeans中的initializationSteps)
Param for the number of steps for the k-means|| initialization mode. This is an advanced setting – the default of 2 is almost always enough.
maxIter(最大迭代次数,对应mllib.KMeans中的maxIterations)
超过最大迭代次数,算法终止。
Param for maximum number of iterations
tol (收敛阈值,对应mllib.KMeans中的epsilon)
如果两次迭代之间同一质心前后之间的距离小于tol说明算法已经收敛。
Param for the convergence tolerance for iterative algorithms
seed (随机种子)
Param for random seed
下面我们来看一个完整的例子:
object KMeansApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("KMeans App")
.master("local[*]")
.getOrCreate()
val df = spark.createDataFrame(Seq(
(0.0, 0.0, 0.0),
(0.1, 0.1, 0.1),
(0.2, 0.2, 0.2),
(9.0, 9.0, 9.0),
(9.1, 9.1, 9.1),
(9.2, 9.2, 9.2)
)).toDF("x", "y", "z")
val assembler = new VectorAssembler()
.setInputCols(Array("x", "y", "z"))
.setOutputCol("features")
val dataset = assembler.transform(df)
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)
model.clusterCenters.foreach(println)
}
}
运行上面代码,最终得到两个簇的聚类中心:
[0.1,0.1,0.1]
[9.099999999999998,9.099999999999998,9.099999999999998]
3、Spark KMeans源码
下面我们通过跟踪KMeans算法的执行流程来分析其实现过程。
3.1、DataFrame转换为RDD[Vector]
从第2小节的例子中,我们可以看到KMeans的入口是val model = kmeans.fit(dataset)
这条语句,我们先来看看fit
函数。
前面我们提到:ml.KMeans只是mllib.KMeans的封装,mllib.KMeans的接口是基于RDD的,而ml.KMeans的接口是基于DataFrame的,fit
函数所做的工作是将输入数据从DataFrame形式转换为RDD形式,并创建一个mllib.KMeans实例对输入数据进行训练从而得到一个KMeansModel对象。
这验证了我们前面说的ml.KMeans只是对mllib.KMeans的封装。
@Since("2.0.0")
override def fit(dataset: Dataset[_]): KMeansModel = {
... // 省略无关代码
// 将DataFrame转换为RDD形式
val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
}
... // 省略无关代码
instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, maxIter, seed, tol)
// 创建一个一个mllib.KMeans实例,在mllib.KMeans中收敛反之为Epsilon,与tol含义相同
val algo = new MLlibKMeans()
.setK($(k))
.setInitializationMode($(initMode))
.setInitializationSteps($(initSteps))
.setMaxIterations($(maxIter))
.setSeed($(seed))
.setEpsilon($(tol))
// 训练得到一个mllib.KMeansModel
val parentModel = algo.run(instances, Option(instr))
// 创建ml.KMeansModel,实际上也是对mllib.KMeansModel的封装
val model = copyValues(new KMeansModel(uid, parentModel).setParent(this))
... // 省略无关代码
model
}
接下来我们随着algo.run(instances, Option(instr))
进入mllib.KMeans类,看看KMeans算法的具体实现。
3.2、求特征向量的模,转换为VectorWithNorm
mllib.KMeans定义了VectorWithNorm类,实际上是对特征向量vector和其二次范数(向量的模)norm的封装。为什么要缓存特征向量的模呢?主要是为了方便后面快速计算两个向量之间的距离。
// 可以看到VectorWithNorm的成员变量包括vector和其模norm
class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable
mllib.KMeans中的run
方法将RDD[Vector]转换为RDD[VectorWithNorm],并将该结构作为输入数据进行模型训练。
private[spark] def run(
data: RDD[Vector],
instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {
... // 省略无关代码
// 计算每一个样本向量的模
val norms = data.map(Vectors.norm(_, 2.0))
norms.persist()
// 将每一个样本向量与其模关联起来成为新的输入数据
val zippedData = data.zip(norms).map { case (v, norm) =>
new VectorWithNorm(v, norm)
}
// 训练模型
val model = runAlgorithm(zippedData, instr)
norms.unpersist()
... // 省略无关代码
model
}
3.3、初始化k个质心(聚类中心点)
KMeans训练的第一件事就是要找出k个初始化质心,mllib.KMeans提供两个方法:
- 随机选择方法:根据给定的随机种子seed参数,随机选取k个质心;
- 并行化方法:使用K-means||算法选择,默认方法。
具体使用哪种初始化方法由参数initializationMode(ml.KMeans的initMode参数)决定,代码如下:
val centers = initialModel match {
case Some(kMeansCenters) =>
kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
case None =>
if (initializationMode == KMeans.RANDOM) {
// 随机初始化
initRandom(data)
} else {
// K-means|| 初始化
initKMeansParallel(data)
}
}
接下来我们具体分析下这两种方法的实现过程。
3.3.1、随机初始化质心
随机初始化质心的方法极其简单,使用了Spark无放回的随机抽样方法takeSample
抽取k个样本点即可。代码如下:
private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
behavior of k-means|| in the same situation
data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt())
.map(_.vector).distinct.map(new VectorWithNorm(_))
}
随机选取初始质心的方法一般不推荐使用。
3.3.2、使用K-means||方法初始化质心
使用K-means||方法来初始化质心的过程由initKMeansParallel函数实现,相对比较复杂。大家可以结合第1节中关于K-means++和K-means||算法的介绍对照理解。下面,我们分析下其实现过程。
第一步:随机选择一个样本点作为第一个质心
// 从输入的数据集中随机选择一个样本点作为第一个质心
val seed = new XORShiftRandom(this.seed).nextInt()
val sample = data.takeSample(false, 1, seed)
第二步:根据已有质心,循环迭代求得其余质心
根据已经选择了的质心集合,K-means||会通过多次迭代来继续选择其余的质心,具体的迭代次数由initializationSteps参数决定(这里Spark并没有像K-means||论文那样迭代log(k)次,默认只进行了两次迭代)。
在每次迭代中,需要完成以下操作:
- 将上一轮迭代选取出来的质心广播出去,以减少步骤2中的数据交换量;
- 对于数据集中的每一个样本点分别计算它与上一轮迭代选取出来的质心集合之间的最短距离;
- 通过一定的概率来抽样,并将选取的样本点作为本轮迭代新选取的质心。
在步骤2中,有一个值得注意的点是Spark在计算“数据集中的每一个样本点和已选取的质心集合之间的最短距离”时采用了一种优化方法来避免不必要的计算。这一功能由pointCost->findClosest
函数实现。具体代码为:
private[mllib] def findClosest( centers: TraversableOnce[VectorWithNorm], point: VectorWithNorm): (Int, Double) = {
var bestDistance = Double.PositiveInfinity
var bestIndex = 0
var i = 0
centers.foreach { center =>
var lowerBoundOfSqDist = center.norm - point.norm
lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
if (lowerBoundOfSqDist < bestDistance) {
val distance: Double = fastSquaredDistance(center, point)
if (distance < bestDistance) {
bestDistance = distance
bestIndex = i
}
}
i += 1
}
(bestIndex, bestDistance)
}
前面我们已经缓存了每一个特征向量的模,所以可以方便地计算出lowerBoundOfSqDist的值,极大地减少了计算量。
这一步骤的完整代码如下:
// 保存所有选择出来的质心
val centers = ArrayBuffer[VectorWithNorm]()
// 用于保存每一轮迭代中新选择出来的质心
var newCenters = Seq(sample.head.toDense)
centers ++= newCenters
// 循环计数器
var step = 0
var bcNewCentersList = ArrayBuffer[Broadcast[_]]()
while (step < initializationSteps) {
// newCenters记录着上一轮迭代选取出来的质心,将这些质心广播出去,避免每次都要交换数据
val bcNewCenters = data.context.broadcast(newCenters)
bcNewCentersList += bcNewCenters
// 初始化时每个样本点到newCenters所有质心中的最短距离都为“无限大”
val preCosts = costs
// 计算每一个样本点到newCenters所有质心中的最短距离
costs = data.zip(preCosts).map { case (point, cost) =>
math.min(KMeans.pointCost(bcNewCenters.value, point), cost)
}.persist(StorageLevel.MEMORY_AND_DISK)
// 对上面求到的所有最短距离求和
val sumCosts = costs.sum()
bcNewCenters.unpersist(blocking = false)
preCosts.unpersist(blocking = false)
// 使用K-means||算法的概率公式对样本点进行过滤从而获得本次迭代选取的质心
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k / sumCosts }.map(_._1)
}.collect()
newCenters = chosen.map(_.toDense)
centers ++= newCenters
step += 1
}
第三步:使用K-means++聚类求得最终k个质心
通过上一个步骤选取的候选质心个数很可能会多于k个,如果发生这种情况,Spark会在本地对这些候选质心进行K-means++聚类来得到最终k个初始质心。
// 对候选质心集合去重
val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_))
// 如果上一个步骤中选取的候选质心个数小于或等于k就直接返回,否则
// 使用带权重的K-means++进一步聚类以获得最终k个初始质心
if (distinctCenters.size <= k) {
distinctCenters.toArray
} else {
// 计算每个候选质心的权重值,其值为数据集中属于该质心代表类别的样本个数
val bcCenters = data.context.broadcast(distinctCenters)
val countMap = data.map(KMeans.findClosest(bcCenters.value, _)._1).countByValue()
bcCenters.destroy(blocking = false)
// K-means++进行本地聚类
val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray
LocalKMeans.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30)
}
上面代码的关键在于使用K-means++算法对候选质心聚类为k个初始质心的过程,LocalKMeans比较简单,这里对其实现加了注释帮助大家理解,就不展开讲解了。
LocalKMeans的实现使用到了每个候选质心的权重值,该权重值对应输入数据集中属于该质心所代表簇的样本个数。这样就会以较大的概率选出“多数类“代表质心作为第一个初始质心。
LocalKMeans代码实现如下:
private[mllib] object LocalKMeans extends Logging {
def kMeansPlusPlus(
seed: Int,
points: Array[VectorWithNorm],
weights: Array[Double],
k: Int,
maxIterations: Int
): Array[VectorWithNorm] = {
val rand = new Random(seed)
// 特征向量的维度
val dimensions = points(0).vector.size
// 用以保存k个初始化质心
val centers = new Array[VectorWithNorm](k)
//***** K-means++:初始化k个质心 *****//
// 从输入的数据集中随机选择一个样本点作为第一个质心,加上权重可以以较大的概率挑选到一个多数类
centers(0) = pickWeighted(rand, points, weights).toDense
// 记录所有样本与当前质心的距离
val costArray = points.map(KMeans.fastSquaredDistance(_, centers(0)))
// 循环k-1次选出其余质心,i表示当前选择的质心在centers数组中的下标
for (i <- 1 until k) {
// 计算每个样本点和其对应权重的乘积
val sum = costArray.zip(weights).map(p => p._1 * p._2).sum
val r = rand.nextDouble() * sum
var cumulativeScore = 0.0
var j = 0
// 使用“轮盘法”选出概率较大的新的质心
while (j < points.length && cumulativeScore < r) {
cumulativeScore += weights(j) * costArray(j)
j += 1
}
if (j == 0) {
logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." +
s" Using duplicate point for center k = $i.")
centers(i) = points(0).toDense
} else {
centers(i) = points(j - 1).toDense
}
// 更新代价数组
for (p <- points.indices) {
costArray(p) = math.min(KMeans.fastSquaredDistance(points(p), centers(i)), costArray(p))
}
}
// ***** 重新计算每个簇的质心,不断更新迭代(Lloyd算法) *****//
// 记录上一次迭代中每一个样本点所属簇(即类别)
val oldClosest = Array.fill(points.length)(-1)
// 迭代计数器
var iteration = 0
// 记录当前迭代是否有样本点所属簇发生变化
var moved = true
// 如果所有样本点所述簇不再变化或达到最大迭代次数,算法终止
while (moved && iteration < maxIterations) {
moved = false
// 数组,记录每一个簇的样本个数
val counts = Array.fill(k)(0.0)
// Vector数组,累加每个簇各个样本特征向量,sums和counts相除就是各个簇新的质心
val sums = Array.fill(k)(Vectors.zeros(dimensions))
var i = 0
// 计算每个样本到k个质心的距离并将其划分到距离最近的质心所对应的簇中
while (i < points.length) {
val p = points(i)
val index = KMeans.findClosest(centers, p)._1
axpy(weights(i), p.vector, sums(index))
counts(index) += weights(i)
// 判断当前样本点所属簇是否发生变化,如果发生变化则更新相关变量
if (index != oldClosest(i)) {
moved = true
oldClosest(i) = index
}
i += 1
}
// 重新计算每个簇的质心
var j = 0
while (j < k) {
if (counts(j) == 0.0) {
// 如果一个簇没有一个样本,随机赋予一个质心
centers(j) = points(rand.nextInt(points.length)).toDense
} else {
scal(1.0 / counts(j), sums(j))
centers(j) = new VectorWithNorm(sums(j))
}
j += 1
}
iteration += 1
}
... // 忽略无关代码
centers
}
// 基于权重从输入数组中随机获取一个元素,类似于“轮盘法”
private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = {
// 产生一个随机值并乘上权重数组之和,得到一个阈值
val r = rand.nextDouble() * weights.sum
var i = 0
var curWeight = 0.0
// 从左到右遍历权重数组,累计每个元素的权重,当当前权重之和大于阈值r时就确定了数组中的一个元素
while (i < data.length && curWeight < r) {
curWeight += weights(i)
i += 1
}
// 返回数组中的这个元素
data(i - 1)
}
}
最后,initKMeansParallel函数的完整流程如下:
private[clustering] def initKMeansParallel(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
// Initialize empty centers and point costs.
var costs = data.map(_ => Double.PositiveInfinity)
//***** 步骤一:随机选择一个样本点作为第一个质心 *****//
val seed = new XORShiftRandom(this.seed).nextInt()
val sample = data.takeSample(false, 1, seed)
// Could be empty if data is empty; fail with a better message early:
require(sample.nonEmpty, s"No samples available from $data")
//***** 步骤二:根据已有质心通过循环迭代求得其余质心 *****//
// 保存所有选择出来的质心
val centers = ArrayBuffer[VectorWithNorm]()
// 用于保存每一轮迭代中新选择出来的质心
var newCenters = Seq(sample.head.toDense)
centers ++= newCenters
// 循环计数器
var step = 0
var bcNewCentersList = ArrayBuffer[Broadcast[_]]()
while (step < initializationSteps) {
// newCenters记录着上一轮迭代选取出来的质心,将这些质心广播出去,避免每次都要交换数据
val bcNewCenters = data.context.broadcast(newCenters)
bcNewCentersList += bcNewCenters
// 初始化时每个样本点到newCenters所有质心中的最短距离都为“无限大”
val preCosts = costs
// 计算每一个样本点到newCenters所有质心中的最短距离
costs = data.zip(preCosts).map { case (point, cost) =>
math.min(KMeans.pointCost(bcNewCenters.value, point), cost)
}.persist(StorageLevel.MEMORY_AND_DISK)
// 对上面求到的所有最短距离求和
val sumCosts = costs.sum()
bcNewCenters.unpersist(blocking = false)
preCosts.unpersist(blocking = false)
// 使用K-means||算法的概率公式对样本点进行过滤从而获得本次迭代选取的质心
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k / sumCosts }.map(_._1)
}.collect()
newCenters = chosen.map(_.toDense)
centers ++= newCenters
step += 1
}
costs.unpersist(blocking = false)
bcNewCentersList.foreach(_.destroy(false))
//***** 步骤三:获取最终的k个质心 *****//
// 对候选质心集合去重
val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_))
// 如果上一个步骤中选取的候选质心个数小于或等于k就直接返回,否则
// 使用带权重的K-means++进一步聚类以获得最终k个初始质心
if (distinctCenters.size <= k) {
distinctCenters.toArray
} else {
// 计算每个候选质心的权重值,其值为数据集中属于该质心代表类别的样本个数
val bcCenters = data.context.broadcast(distinctCenters)
val countMap = data.map(KMeans.findClosest(bcCenters.value, _)._1).countByValue()
bcCenters.destroy(blocking = false)
// K-means++进行本地聚类,返回k个初始化质心
val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray
LocalKMeans.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30)
}
}
至此,我们已经得到了k个初始质心。
3.4、迭代训练
得到k个初始化质心后,接下来的工作就是不断地迭代下面两个步骤直到每个簇的质心不再发生变化或者达到最大的迭代次数为止。
- 计算数据集中每个样本到每个簇质心的距离并将其划分到距离最近的质心所对应的簇中;
- 重新计算每个簇的质心;
这两个步骤对于经典KMeans、KMeans++或者KMeans||来说都是一样的。
下面我们回到runAlgorithm
方法来分析一下迭代训练过程。
runAlgorithm
方法使用两个变量来标识迭代过程是否结束。converged
标识算法是否收敛,如果前后两次训练后每个簇的质心不再发生变化说明算法已经收敛;iteration
记录了当前迭代次数,如果迭代次数超过了maxIterations
,则终止训练过程。训练过程的整个框架如下:
var converged = false
var iteration = 0
// Execute iterations of Lloyd's algorithm until converged
while (iteration < maxIterations && !converged) {
// 更新每个样本点所属簇
// 更新每个簇的质心
iteration += 1
}
3.4.1、更新每个样本点所属簇
为了加快训练过程,Spark使用mapPartitions
方法让各个partition中的数据并行更新,最后再通过reduceByKey
方法将所有数据汇总起来。这一过程关键是以下两个数据结构
val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
val counts = Array.fill(thisCenters.length)(0L)
sums是一个Vector类型的数组,长度为簇的总个数,用以将每个簇下所有样本的特征向量累计起来;counts是一个Long类型数组,长度也为簇的总个数,记录了每个簇包含的样本个数。
这里我们只需要把每个样本点的特征向量累加到对应的簇上去就可以了,并不需要真正地把每个样本点物理地分到对应的簇上去。
实现代码如下:
// 每个partition独立并行计算,最后再将结果合并起来
val totalContribs = data.mapPartitions { points =>
val thisCenters = bcCenters.value
// 特征向量的维度
val dims = thisCenters.head.vector.size
// Vector数组,将每个簇下所有样本的特征向量累计起来
val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
// Long数组,记录每个簇下的样本点个数
val counts = Array.fill(thisCenters.length)(0L)
// 计算每个样本到k个质心的距离并将其划分到距离最近的质心所对应的簇中
points.foreach { point =>
val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
costAccum.add(cost)
val sum = sums(bestCenter)
axpy(1.0, point.vector, sum)
counts(bestCenter) += 1
}
// 转换为(质心下标,(对应sums值,对应counts值))的形式,方便下一步合并
counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
}.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
// 通过reduceByKey将各个partition的数据合并在一起
axpy(1.0, sum2, sum1)
(sum1, count1 + count2)
}.collectAsMap()
3.4.2、更新每个簇的质心
了解了“更新每个样本点所属簇”的过程,很容易联想到,sums
和counts
相除就是每个簇新的质心。
考虑到前后两次训练每个簇的质心基本不可能完全相等,mllib.KMeans提供了参数epsilon
供用户设置阈值,只要质心前后之间的距离之差小于epsilon
,mllib.KMeans就认为他们是“相等”的。
实现代码如下:
// 重新计算每个簇的中心
converged = true
totalContribs.foreach { case (j, (sum, count)) =>
scal(1.0 / count, sum)
val newCenter = new VectorWithNorm(sum)
// 判断当前质心是否收敛
if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) {
converged = false
}
centers(j) = newCenter
}
经过上面的训练过程,我们最后得到了k个(也有可能小于k个)簇最终的质心,预测的时候只要计算输入特征向量与各个簇质心之间的距离,将其划分到距离最近的簇中去就可以了。
好了,Spark KMeans算法的源码分析就讲到这里了。如果有不正确的地方,望大家批评指正。