by 王犇 20160115
AUC是分类模型常用的评价手段,目前的Spark mllib里面evaluation包中所提供的auc方法是拿到了roc曲线中的各个点之后再进行auc的计算,但是实际应用场景中(以逻辑回归为例),我们常常是对每个样本进行打分之后整合样本的label直接进行auc的计算,输入可能是(label, predict_score)这样的形式,mllib中提供的方案就不太适用了,所以这里提供了另一种计算方法,采用了针对0,1分类问题的近似计算方案,叫做BinaryAuc:
首先对predict_score进行排序,然后根据样本正负例的情况,分别计算每个小梯形的面积,最后汇总成为最终的auc值(由于在spark中数据是分布式RDD的形态,所以计算梯形面积的时候需要知道前一个RDD的offset,这里需要先遍历数据,但是避免了汇总到单机进行计算):
package org.apache.spark.mllib.wml
/**
* @author wangben 2015
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.collection.Iterator
import Array._
class BinaryAUC extends Serializable {
//input format: predictioin,label
def auc( data: RDD[ (Double, Double) ] ) : Double =
{
//group same score result
val group_result = data.groupByKey().map(x => {
var r = new Array[Double](2)
for(item <- x._2) {
if(item > 0.0) r(1) += 1.0
else r(0) += 1.0
}
(x._1, r) // score, [ FalseN, PositiveN ]
})
//points 需要累积
val group_rank = group_result.sortByKey(false) //big first
//计算累积
var step_sizes = group_rank.mapPartitions( x =>
{
var r = List[(Double, Double)]()
var fn_sum = 0.0
var pn_sum = 0.0
while( x.hasNext )
{
val cur = x.next
fn_sum += cur._2(0)
pn_sum += cur._2(1)
}
r.::(fn_sum, pn_sum).toIterator
} ,true).collect
var debug_string = ""
var step_sizes_sum = ofDim[Double](step_sizes.size, 2) //二维数组
for( i <- 0 to (step_sizes.size - 1) ) {
if(i == 0) {
step_sizes_sum(i)(0) = 0.0
step_sizes_sum(i)(1) = 0.0
} else {
step_sizes_sum(i)(0) = step_sizes_sum(i - 1)(0) + step_sizes(i - 1)._1
step_sizes_sum(i)(1) = step_sizes_sum(i - 1)(1) + step_sizes(i - 1)._2
}
debug_string += "\t" + step_sizes_sum(i)(0).toString + "\t" + step_sizes_sum(i)(1).toString
}
val sss_len = step_sizes_sum.size
val total_fn = step_sizes_sum(sss_len - 1)(0) + step_sizes(sss_len - 1)._1
val total_pn = step_sizes_sum(sss_len - 1)(1) + step_sizes(sss_len - 1)._2
//System.out.println( "debug auc_step_size: " + debug_string)
val bc_step_sizes_sum = data.context.broadcast(step_sizes_sum)
val modified_group_rank = group_rank.mapPartitionsWithIndex( (index, x) =>
{
var sss = bc_step_sizes_sum.value
var r = List[(Double, Array[Double])]()
//var r = List[(Double, String)]()
var fn = sss(index)(0) //start point
var pn = sss(index)(1)
while( x.hasNext )
{
var p = new Array[Double](2)
val cur = x.next
p(0) = fn + cur._2(0)
p(1) = pn + cur._2(1)
fn += cur._2(0)
pn += cur._2(1)
//r.::= (cur._1, p(0).toString() + "\t" + p(1).toString())
r.::= (cur._1, p)
}
r.reverse.toIterator
} ,true)
//output debug info
//modified_group_rank.map(l => l._1.toString + "\t" + l._2(0).toString + "\t" + l._2(1)).saveAsTextFile("/home/hdp_teu_dia/resultdata/wangben/debug_info")
val score = modified_group_rank.sliding(2).aggregate(0.0)(
seqOp = (auc: Double, points: Array[ (Double, Array[Double]) ]) => auc + TrapezoidArea(points),
combOp = _ + _
)
System.out.println( "debug auc_mid: " + score
+ "\t" + (total_fn*total_pn).toString()
+ "\t" + total_fn.toString()
+ "\t" + total_pn.toString() )
score/(total_fn*total_pn)
}
private def TrapezoidArea(points :Array[(Double, Array[Double])]):Double = {
val x1 = points(0)._2(0)
val y1 = points(0)._2(1)
val x2 = points(1)._2(0)
val y2 = points(1)._2(1)
val base = x2 - x1
val height = (y1 + y2)/2.0
return base*height
}
}
object AUCTest {
def main(args: Array[String]){
val conf=new SparkConf()
conf.setAppName("TestEvaluation")
val sc = new SparkContext(conf)
val accum=sc.accumulator(0)
val input_file = sc.textFile(args(0))
val predict_label = input_file.map(l => {
val x = l.stripPrefix("(").stripSuffix(")") split(",")
(x(0).toDouble, x(1).toDouble)
})
val auc = new BinaryAUC()
val auc_score = auc.auc(predict_label)
System.out.println("debug auc_score: " + auc_score.toString())
}
}
附,RDD的实例文档:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
确实比官方的好太多