梯度迭代树回归
算法简介:
梯度提升树是一种决策树的集成算法。它通过反复迭代训练决策树来最小化损失函数。决策树类似,梯度提升树具有可处理类别特征、易扩展到多分类问题、不需特征缩放等性质。Spark.ml通过使用现有decision tree工具来实现。
梯度提升树依次迭代训练一系列的决策树。在一次迭代中,算法使用现有的集成来对每个训练实例的类别进行预测,然后将预测结果与真实的标签值进行比较。通过重新标记,来赋予预测结果不好的实例更高的权重。所以,在下次迭代中,决策树会对先前的错误进行修正。
对实例标签进行重新标记的机制由损失函数来指定。每次迭代过程中,梯度迭代树在训练数据上进一步减少损失函数的值。spark.ml为分类问题提供一种损失函数(Log Loss),为回归问题提供两种损失函数(平方误差与绝对误差)。
Spark.ml支持二分类以及回归的随机森林算法,适用于连续特征以及类别特征。
*注意梯度提升树目前不支持多分类问题。
参数:
checkpointInterval:
类型:整数型。
含义:设置检查点间隔(>=1),或不设置检查点(-1)。
featuresCol:
类型:字符串型。
含义:特征列名。
impurity:
类型:字符串型。
含义:计算信息增益的准则(不区分大小写)。
labelCol:
类型:字符串型。
含义:标签列名。
lossType:
类型:字符串型。
含义:损失函数类型。
maxBins:
类型:整数型。
含义:连续特征离散化的最大数量,以及选择每个节点分裂特征的方式。
maxDepth:
类型:整数型。
含义:树的最大深度(>=0)。
maxIter:
类型:整数型。
含义:迭代次数(>=0)。
minInfoGain:
类型:双精度型。
含义:分裂节点时所需最小信息增益。
minInstancesPerNode:
类型:整数型。
含义:分裂后自节点最少包含的实例数量。
predictionCol:
类型:字符串型。
含义:预测结果列名。
seed:
类型:长整型。
含义:随机种子。
subsamplingRate:
类型:双精度型。
含义:学习一棵决策树使用的训练数据比例,范围[0,1]。
stepSize:
类型:双精度型。
含义:每次迭代优化步长。
示例:
下面的例子中,GBTRegressor仅迭代了一次,在实际操作中是不现实的。
Scala:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
// Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data)
// Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a GBT model. val gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10)
// Chain indexer and GBT in a Pipeline. val pipeline = new Pipeline()
.setStages(Array(featureIndexer, gbt))
// Train model. This also runs the indexer. val model = pipeline.fit(trainingData)
// Make predictions. val predictions = model.transform(testData)
// Select example rows to display. predictions.select("prediction", "label", "features").show(5)
// Select (prediction, true label) and compute test error. val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Root Mean Squared Error (RMSE) on test data = " + rmse)
val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println("Learned regression GBT model:\n" + gbtModel.toDebugString)
Java:
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load and parse the data file, converting it to a DataFrame. Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data);
// Split the data into training and test sets (30% held out for testing). Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a GBT model. GBTRegressor gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10);
// Chain indexer and GBT in a Pipeline. Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});
// Train model. This also runs the indexer. PipelineModel model = pipeline.fit(trainingData);
// Make predictions. Dataset<Row> predictions = model.transform(testData);
// Select example rows to display. predictions.select("prediction", "label", "features").show(5);
// Select (prediction, true label) and compute test error. RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());
Python:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)
# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
gbtModel = model.stages[1]
print(gbtModel) # summary only