Spark上如何做分布式AUC计算

by 王犇 20160115

AUC是分类模型常用的评价手段,目前的Spark mllib里面evaluation包中所提供的auc方法是拿到了roc曲线中的各个点之后再进行auc的计算,但是实际应用场景中(以逻辑回归为例),我们常常是对每个样本进行打分之后整合样本的label直接进行auc的计算,输入可能是(label, predict_score)这样的形式,mllib中提供的方案就不太适用了,所以这里提供了另一种计算方法,采用了针对0,1分类问题的近似计算方案,叫做BinaryAuc:

首先对predict_score进行排序,然后根据样本正负例的情况,分别计算每个小梯形的面积,最后汇总成为最终的auc值(由于在spark中数据是分布式RDD的形态,所以计算梯形面积的时候需要知道前一个RDD的offset,这里需要先遍历数据,但是避免了汇总到单机进行计算):

package org.apache.spark.mllib.wml

/**
 * @author wangben 2015
 */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.collection.Iterator
import Array._

class BinaryAUC extends Serializable {
  //input format: predictioin,label
  def auc( data: RDD[ (Double, Double) ] ) : Double =
  {
    //group same score result
    val group_result = data.groupByKey().map(x => {
      var r = new Array[Double](2)
      for(item <- x._2) {
        if(item > 0.0) r(1) += 1.0
        else r(0) += 1.0
      }
      (x._1, r) // score, [ FalseN, PositiveN ]
    })

    //points 需要累积
    val group_rank = group_result.sortByKey(false) //big first
    //计算累积
    var step_sizes = group_rank.mapPartitions( x => 
      {
        var r = List[(Double, Double)]()
        var fn_sum = 0.0
        var pn_sum = 0.0
        while( x.hasNext ) 
        {
          val cur = x.next
          fn_sum += cur._2(0)
          pn_sum += cur._2(1)
        }
        r.::(fn_sum, pn_sum).toIterator
      } ,true).collect
    var debug_string = ""
    var step_sizes_sum = ofDim[Double](step_sizes.size, 2) //二维数组
    for( i <- 0 to (step_sizes.size - 1) ) {
      if(i == 0) { 
        step_sizes_sum(i)(0) = 0.0
        step_sizes_sum(i)(1) = 0.0
      } else {
        step_sizes_sum(i)(0) = step_sizes_sum(i - 1)(0) + step_sizes(i - 1)._1
        step_sizes_sum(i)(1) = step_sizes_sum(i - 1)(1) + step_sizes(i - 1)._2
      }
      debug_string += "\t" + step_sizes_sum(i)(0).toString + "\t" + step_sizes_sum(i)(1).toString 
    }
    val sss_len = step_sizes_sum.size
    val total_fn = step_sizes_sum(sss_len - 1)(0) + step_sizes(sss_len - 1)._1
    val total_pn = step_sizes_sum(sss_len - 1)(1) + step_sizes(sss_len - 1)._2
    //System.out.println( "debug auc_step_size: " + debug_string)
    
    val bc_step_sizes_sum = data.context.broadcast(step_sizes_sum)
    val modified_group_rank = group_rank.mapPartitionsWithIndex( (index, x) => 
      {
        var sss = bc_step_sizes_sum.value
        var r = List[(Double, Array[Double])]()
        //var r = List[(Double, String)]()
        var fn = sss(index)(0) //start point
        var pn = sss(index)(1)
        while( x.hasNext ) 
        {
          var p = new Array[Double](2)
          val cur = x.next
          p(0) = fn + cur._2(0)
          p(1) = pn + cur._2(1)
          fn += cur._2(0)
          pn += cur._2(1)
          //r.::= (cur._1, p(0).toString() + "\t" + p(1).toString()) 
          r.::= (cur._1, p) 
        }
        r.reverse.toIterator
      } ,true)
      
    //output debug info
    //modified_group_rank.map(l => l._1.toString + "\t" + l._2(0).toString + "\t" + l._2(1)).saveAsTextFile("/home/hdp_teu_dia/resultdata/wangben/debug_info")
    
    val score = modified_group_rank.sliding(2).aggregate(0.0)(
      seqOp = (auc: Double, points: Array[ (Double, Array[Double]) ]) => auc + TrapezoidArea(points),
      combOp = _ + _
    )
    System.out.println( "debug auc_mid: " + score 
        + "\t" + (total_fn*total_pn).toString() 
        + "\t" + total_fn.toString() 
        + "\t" + total_pn.toString() )

    score/(total_fn*total_pn)
  }
  
  private def TrapezoidArea(points :Array[(Double, Array[Double])]):Double = {
    val x1 = points(0)._2(0)
    val y1 = points(0)._2(1)
    val x2 = points(1)._2(0)
    val y2 = points(1)._2(1)
    
    val base = x2 - x1
    val height = (y1 + y2)/2.0
    return base*height
  }
}

object AUCTest {
  
  def  main(args: Array[String]){
    val conf=new SparkConf()
    conf.setAppName("TestEvaluation")
    val sc = new SparkContext(conf)
    val accum=sc.accumulator(0)
    val input_file = sc.textFile(args(0))
    val predict_label = input_file.map(l => {
      val x = l.stripPrefix("(").stripSuffix(")") split(",")
      (x(0).toDouble, x(1).toDouble)
      })
    val auc = new BinaryAUC()
    val auc_score = auc.auc(predict_label)
    System.out.println("debug auc_score: " + auc_score.toString())
  }
  
}

附,RDD的实例文档:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

确实比官方的好太多

点赞