两种模型选择和超参数调整方法及Spark MLlib使用示例(Scala/Java/Python)

机器学习调试:模型选择和超参数调整

模型选择(又名超参数调整)

在机器学习中非常重要的任务就是模型选择,或者使用数据来找到具体问题的最佳的模型和参数,这个过程也叫做调试。调试可以在独立的如逻辑回归等估计器中完成,也可以在包含多样算法、特征工程和其他步骤的管线中完成。用户应该一次性调试整个管线,而不是独立的调整管线中的每个组成部分。

MLlib支持交叉验证和训练验证分裂两个模型选择工具。使用这两个工具要求包含如下对象:

1.估计器:待调试的算法或管线。

2.一系列参数表:可选参数,也叫做“参数网格”。

3.评估器:评估模型拟合程度的准则或方法。

模型选择工具工作原理如下:

1.将输入数据划分为训练数据和测试数据。

2.对每组训练数据与测试数据对,对参数表集合,用相应参数来拟合估计器,得到训练后的模型,再使用评估器来评估模型表现。

3.选择性能表现最优模型对应参数表。

其中,对于回归问题评估器可选择RegressionEvaluator,二值数据可选择BinaryClassificationEvaluator,多分类问题可选择MulticlassClassificationEvaluator。评估器里默认的评估准则可通过setMetricName方法重写。

用户可通过ParamGridBuilder构建参数网格。

交叉验证

交叉验证将数据集划分为若干子集分别地进行训练和测试。如当k=3时,交叉验证产生3个训练数据与测试数据对,每个数据对使用2/3的数据来训练,1/3的数据来测试。对于一组特定的参数表,交叉验证计算基于三组不同训练数据与测试数据对训练得到的模型的评估准则的平均值。确定最佳参数表后,交叉验证最后使用最佳参数表基于全部数据来重新拟合估计器。

示例:

注意对参数网格进行交叉验证的成本是很高的。如下面例子中,参数网格hashingTF.numFeatures有3个值,lr.regParam有2个值,CrossValidator使用2折交叉验证。这样就会产生(3*2)*2=12中不同的模型需要进行训练。在实际的设置中,通常有更多的参数需要设置,且我们可能会使用更多的交叉验证折数(3折或者10折都是经使用的)。所以交叉验证的成本是很高的,尽管如此,比起启发式的手工验证,交叉验证仍然是目前存在的参数选择方法中非常有用的一种。

Scala:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.Row

// Prepare training data from a list of (id, text, label) tuples. val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0),
  (4L, "b spark who", 1.0),
  (5L, "g d a y", 0.0),
  (6L, "spark fly", 1.0),
  (7L, "was mapreduce", 0.0),
  (8L, "e spark program", 1.0),
  (9L, "a e c l", 0.0),
  (10L, "spark compile", 1.0),
  (11L, "hadoop software", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. val paramGrid = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .build()

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. // This will allow us to jointly choose parameters for all Pipeline stages. // A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. // Note that the evaluator here is a BinaryClassificationEvaluator and its default metric // is areaUnderROC. val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)  // Use 3+ in practice 
// Run cross-validation, and choose the best set of parameters. val cvModel = cv.fit(training)

// Prepare test documents, which are unlabeled (id, text) tuples. val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "mapreduce spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents. cvModel uses the best model found (lrModel). cvModel.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }

Java:

import java.util.Arrays;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Prepare training documents, which are labeled. Dataset<Row> training = spark.createDataFrame(Arrays.asList(
  new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
  new JavaLabeledDocument(1L, "b d", 0.0),
  new JavaLabeledDocument(2L,"spark f g h", 1.0),
  new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0),
  new JavaLabeledDocument(4L, "b spark who", 1.0),
  new JavaLabeledDocument(5L, "g d a y", 0.0),
  new JavaLabeledDocument(6L, "spark fly", 1.0),
  new JavaLabeledDocument(7L, "was mapreduce", 0.0),
  new JavaLabeledDocument(8L, "e spark program", 1.0),
  new JavaLabeledDocument(9L, "a e c l", 0.0),
  new JavaLabeledDocument(10L, "spark compile", 1.0),
  new JavaLabeledDocument(11L, "hadoop software", 0.0)
), JavaLabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. Tokenizer tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words");
HashingTF hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol())
  .setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01);
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});

// We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. ParamMap[] paramGrid = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures(), new int[] {10, 100, 1000})
  .addGrid(lr.regParam(), new double[] {0.1, 0.01})
  .build();

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. // This will allow us to jointly choose parameters for all Pipeline stages. // A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. // Note that the evaluator here is a BinaryClassificationEvaluator and its default metric // is areaUnderROC. CrossValidator cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator())
  .setEstimatorParamMaps(paramGrid).setNumFolds(2);  // Use 3+ in practice 
// Run cross-validation, and choose the best set of parameters. CrossValidatorModel cvModel = cv.fit(training);

// Prepare test documents, which are unlabeled. Dataset<Row> test = spark.createDataFrame(Arrays.asList(
  new JavaDocument(4L, "spark i j k"),
  new JavaDocument(5L, "l m n"),
  new JavaDocument(6L, "mapreduce spark"),
  new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel). Dataset<Row> predictions = cvModel.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}

Python:

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)

训练验证分裂

除了交叉验证以外,Spark还提供训练验证分裂用以超参数调整。和交叉验证评估K次不同,训练验证分裂只对每组参数评估一次。因此它计算代价更低,但当训练数据集不是足够大时,其结果可靠性不高。

与交叉验证不同,训练验证分裂仅需要一个训练数据与验证数据对。使用训练比率参数将原始数据划分为两个部分。如当训练比率为0.75时,训练验证分裂使用75%数据以训练,25%数据以验证。

与交叉验证相同,确定最佳参数表后,训练验证分裂最后使用最佳参数表基于全部数据来重新拟合估计器。

示例:

Scala:

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

// Prepare training and test data. val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)

val lr = new LinearRegression()

// We use a ParamGridBuilder to construct a grid of parameters to search over. // TrainValidationSplit will try all combinations of values and determine best model using // the evaluator. val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .addGrid(lr.fitIntercept)
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

// In this case the estimator is simply the linear regression. // A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. val trainValidationSplit = new TrainValidationSplit()
  .setEstimator(lr)
  .setEvaluator(new RegressionEvaluator)
  .setEstimatorParamMaps(paramGrid)
  // 80% of the data will be used for training and the remaining 20% for validation.   .setTrainRatio(0.8)

// Run train validation split, and choose the best set of parameters. val model = trainValidationSplit.fit(training)

// Make predictions on test data. model is the model with combination of parameters // that performed best. model.transform(test)
  .select("features", "label", "prediction")
  .show()

Java:

import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt");

// Prepare training and test data. Dataset<Row>[] splits = data.randomSplit(new double[] {0.9, 0.1}, 12345);
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];

LinearRegression lr = new LinearRegression();

// We use a ParamGridBuilder to construct a grid of parameters to search over. // TrainValidationSplit will try all combinations of values and determine best model using // the evaluator. ParamMap[] paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam(), new double[] {0.1, 0.01})
  .addGrid(lr.fitIntercept())
  .addGrid(lr.elasticNetParam(), new double[] {0.0, 0.5, 1.0})
  .build();

// In this case the estimator is simply the linear regression. // A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
  .setEstimator(lr)
  .setEvaluator(new RegressionEvaluator())
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.8);  // 80% for training and the remaining 20% for validation 
// Run train validation split, and choose the best set of parameters. TrainValidationSplitModel model = trainValidationSplit.fit(training);

// Make predictions on test data. model is the model with combination of parameters // that performed best. model.transform(test)
  .select("features", "label", "prediction")
  .show();

Python:

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# Prepare training and test data.
data = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")
train, test = data.randomSplit([0.7, 0.3])
lr = LinearRegression(maxIter=10, regParam=0.1)

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)
# Make predictions on test data. model is the model with combination of parameters
# that performed best.
prediction = model.transform(test)
for row in prediction.take(5):
    print(row)
    原文作者:刘玲源
    原文地址: https://zhuanlan.zhihu.com/p/24277658
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞