import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import breeze.linalg._
/** * @author XiaoTangBao * @date 2019/3/20 19:10 * @version 1.0 */
object LR4 {
def main(args: Array[String]): Unit = {
//屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val sparkSession = SparkSession.builder().master("local[4]").appName("LR").getOrCreate()
//获取数据源
val data = sparkSession.sparkContext.textFile("G:\\mldata\\airfoil_self_noise.txt").map(line => line.split('|'))
//随机切分成训练集和测试集
val splitData = data.randomSplit(Array(0.7,0.3))
val trainData = splitData(0) .map(arr => Row(arr(5).toDouble,arr(0).toDouble,arr(1).toDouble,arr(2).toDouble,arr(3).toDouble,arr(4).toDouble))
val testDataLabel = splitData(1).map(arr => arr(5).toDouble).collect()
val testData = splitData(1).map(arr => arr.dropRight(1).map(_.toDouble)).collect()
//定义schema 和featuresArr
val schema = StructType(List(StructField("label",DoubleType,true),StructField("Frequency",DoubleType,true)
,StructField("Angle",DoubleType,true),StructField("Chord",DoubleType,true)
,StructField("velocity",DoubleType,true),StructField("thickness",DoubleType,true)))
val featuresArr = Array("Frequency","Angle","Chord","velocity","thickness")
val traindf = sparkSession.createDataFrame(trainData,schema)
//转换器
val vectorAssemb = new VectorAssembler().setInputCols(featuresArr).setOutputCol("features")
//模型训练所需格式的DF
val trainDF = vectorAssemb.transform(traindf).select("label","features")
//训练模型
val theaMatrix:DenseMatrix[Double] = LR(trainDF)
//测试数据
var testMatrix = DenseMatrix.zeros[Double](testData.length,featuresArr.length+1)
testMatrix(::,0) := 1.0
for(i<-0 until testData.length){
for(j<-0 until featuresArr.length){
testMatrix(i,j+1) = testData(i)(j)
}
}
//拟合结果
val resultMatrix = testMatrix * theaMatrix
//均方差
val k = (resultMatrix.toArray zip testDataLabel).map{case(predict,value)=>math.pow((predict-value),2)}
var sum = 0.0
for(i<-k) sum += i
val rmse = math.sqrt(sum / testData.length)
println(“rmse = “ + rmse)
}
/** * LR Multiple linear regression * @param df input dataFrame(label,features) * @return Matrix of thea */
def LR(df:DataFrame)={
val label = df.rdd.map(row => row.getDouble(row.fieldIndex("label"))).collect()
val trainData = df.select("features").rdd.map(row => row.toString())
.map(str => str.replace('[',' '))
.map(str => str.replace(']',' '))
.map(str => str.trim()).map(str => str.split(','))
.map(arr => arr.map(str => str.toDouble))
.collect()
//特征维度
val dimensions = trainData(0).length
//系数矩阵行
val matrixRows = trainData.length
//系数矩阵列
val matrixColumns = dimensions + 1
//定义系数矩阵
var matrix = DenseMatrix.zeros[Double](matrixRows,matrixColumns)
matrix(::,0) := 1.0
for(i<-0 until matrixRows){
for(j<-0 until dimensions){
matrix(i,j+1) = trainData(i)(j)
}
}
//定义label矩阵
var labelMatrix = DenseMatrix.zeros[Double](matrixRows,1)
for(i<-0 until matrixRows) labelMatrix(i,0) = label(i)
//求解thea矩阵
val thea = (inv(((matrix.t) * matrix))) * (matrix.t) * labelMatrix
thea
}
}
-------------------------result--------------------------------------------------------------------------------------------------------------------
rmse = 5.003040917178264