看源码, 对spark mllib包的一些笔记
Spark core发展
- RDD
- 弹性分布式数据集
- 编译时类型安全,编译时就能检查出类型错误
- 面向对象的编程风格,直接通过类名点的方式来操作数据
- 序列化和反序列化的性能开销,无论是集群间的通信, 迓是IO操作都需要对对象的结构和数据进行序列化和反序列化
- 慢
- GC的性能开销,频繁的创建和销毁对象, 势必会增加GC。
- DataFrame
- 列方式组织的分布式数据集
- 类型不安全!!!!!!!!
- 结构化数据集
- 包含了以ROW为单位的每行数据的列的信息; Spark通过Schema就能够读懂数据
- 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了
- 特点
- off-heap
- Spark能够以二进制的形式序列化数据(丌包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存
- Tungsten:新的执行引擎;
- Catalyst:新的语法解析框架
- 优缺点
- off-heap就像地盘, schema就像地图, Spark有地图又有自己地盘了
- 丌再受JVM的限制, 也就不再收GC的困扰了
- 对比RDD提升计算效率、减少数据读取、底层计算优化
- DataFrame解决了RDD的缺点, 但是却丢了RDD的优点。 DataFrame不是类型安全的, API也不是面向对象风格的
- DataSet
- 特点
- 类型安全的
- 创建需要一个case Class
- case class
- java class
- DataSet由一系列强类型的对象组成, 编译时可以进行类型检查
- 已经序列化的结构数据
- 编码的二进制
- Encoder的分布式数据集
- 不需要反序列化就能sort,shuffle
- dataframe成为dataSet的一种特例
- 以Catalyst逻辑执行计划表示
功能
- ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
- Featurization: feature extraction, transformation, dimensionality reduction, and selection
- Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
- Persistence: saving and load algorithms, models, and Pipelines
- Utilities: linear algebra, statistics, data handling, etc.
两套API
- RDD-based API
- 过去的API
- spark.mllib
- DataFrame-based API
- 2.0主推的
- spark.ml
依赖
- Breeze —Scala用的数值处理库
- netlib-java
- netlib-java是低级BLAS, LAPACK和ARPACK的包装器
- BLAS(基本线性代数子程序)
- LAPACK — Linear Algebra PACKage
- 它提供了求解联立线性方程组,线性方程组最小二乘解,特征值问题和奇异值问题的例程。
- 还提供了相关的矩阵因子分解(LU,Cholesky,QR,SVD,Schur,广义Schur),以及相关计算
- ARPACK是Fortran77子程序的集合,旨在解决大规模特征值问题
MLBase
- MLlib
- 封装的常用机器学习算法
- 通过RDD和MLlib封装的breeze矩阵计算接口实现了如上算法
- 数据类型
- 本地向量
- 标记点
- 本地矩阵
- 分布式矩阵
- rowMatrix
- IndexRowMatrix
- BlockMatrix
- CoordinateMatrix
- 适合特别稀疏的
- 支持算法
- 分类
- 决策树
- LR
- SVM
- NB
- 回归
- 决策树
- 线性回归
- 岭回归
- Lasso
- 聚类
- kmeans
- 协同过滤
- ALS
- MLRuntime
- spark core提供的分布式内存计算框架
- 运行由optimizer优化过的算法
- MLI
- 进行特征提取和高级ML编程抽象算法实现的APi
- ML Optimizer
- 核心,优化器
- 可以把声明式任务转化为学习计划
- ml
- 提供了基于DF的高层API
- 提供了很多基于流水线的数据处理函数
MLlib
概述
- 不再学习这套老旧API
- 基于RDD的一系列ML操作
Vector
- MLLIb封装的breeze向量方法
- DenseVector
- SparseVector
Matrix
- 封装的breeze方法
BLAS库
- 封装了部分BLAS线代方法
分布式矩阵
- RowMatrix
- 每行对应一个RDD
- IndexedRowMatrix
- 带有一定意义的行索引
- 每行对应一个RDD
- CoordinateMatrix
- 坐标矩阵
- 每一项都是 i,j,value Tuple
- BlockMatrix
- 分块矩阵
ML
概述
- 2.0版本
- 基于DF的高层次API, 可以用来构建工作流
- DataFrame
- 最为ML的数据集
- 支持多种类型
- 向量
- 文本
- 图像
- 结构化数据
- TransFormer
- 转换器
- 用来从一个DF转换成另一个DF
- 是特征变换和机器学习模型的抽象
- 实现transform方法
- Estimator
- 模型学习器
- 从一个DF生成一个Model
- Model也是一个transformer
- 实现fit方法
- Pipelines
- 将多个transformer和Estimator组合, 形成一个工作流
- 测试集测试
- 转换
- 使用Model predict
- 训练模型
- 转换
- 训练模型
- Parameter
- 参数
- set参数
- 如: SetMaxIter(10)
- paramMap: Map(lrl.MaxIter -> 10)
总结: 为什么mllib能够实现分布式机器学习训练?
- 本质: 利用RDD性质实现的数据样本切分的同步训练模型
- 同步:
- 在一个训练迭代下
- Driver需要等待所有分区训练全部完成训练
- 才可以继续下一轮训练
- 数据样本切分
- 异步
- 不支持
- 在TensorFlow中有实现
- 根据当前参数的取值,和随机获取的一小部分训练数据,不同设备各自运行反向传播的过程,并独立地更新参数
- 模型切分
- LR算法没有模型切分
- NN的实现还没看,估计也没有
- 数据划分
- 采用RDD自带的partition机制
- 将数据集划分为n份
- 持久化到内存中, 迭代运算
- 模型训练
- 通过breeze中的优化算法, 本地优化模型
- 在计算LossFunction时,聚合各个aggregator的参数
- 实现分布式训练
- breeze训练库
- chooseDescentDirect
- determineStepSize
- takeStep
- loss, grad = caculate
api的使用
- transformer是一个宽泛的概念, 包括一下的几种类型
- Extraction
- Extracting features from “raw” data
- Transformation
- Scaling, converting, or modifying features
- Selection
- Selecting a subset from a larger set of features
- Locality Sensitive Hashing
- This class of algorithms combines aspects of feature transformation with other algorithms.
- feature Extraction
- TF-IDF
- 词频-逆向文件频率
- 文本挖掘方法
- 体现一个词语在语料库中的重要程度
- 使用词频 TF 来衡量重要性,则很容易过分强调出现频率过高并且文档包含少许信息的单词
- 如 a the of 等无意义的词
- 使用词频来消除无意义词汇的影响
- 使用
- HashingTF不CountVectorizer用于生成词频TF向量
- IDF(逆向文档频率)
- Word2Vec
- Estimator
- 将词语映射到一个固定大小的向量
- 向量可以用来计算文本相似度
- CountVectorizer
- CountVectorizer可以用作估计器来提叏词汇表,并生成CountVectorizerModel。
- CountVectorizer将选择通过诧料库按术诧频率排序的top前几vocabSize词
- FeatureHasher
- feature transformer
- hashingTF
- 特征词集的转换器(Transformer)
- 它可以将返些集合转换成固定长度的特征向量
- HashingTF利用hashing trick,原始特征通过应用哈希函数映射到索引中
- Tokenizer
- 分词器
- 将文本 (如一个句子)拆分成单词的过秳
- StopWordsRemover
- 移除停用词
- n-gram
- Binarizer
- 二值化
- PCA
- 用源数据训练PCA
- 然后用PCA降维
- PolynomialExpansion
- 多项式展开
- 通过产生n维组合将原始特征拓展到多项式空间
- Discrete Cosine Transform (DCT)
- 离散余弦扩散
- StringIndexer
- IndexToString
- OneHotEncoder (Deprecated since 2.3.0)
- OneHotEncoderEstimator
- 热独编码
- 将标签指标映射为二值向量
- VectorIndexer
- 特征转换工具
- 对离散特征进行编号
- 适用于基于树的模型
- Interaction
- Normalizer
- 正则化
- StandardScaler
- 规范化
- MinMaxScaler
- MaxAbsScaler
- Bucketizer
- 离散化重组
- 将一列连续特征转换为特征桶
- ElementwiseProduct
- 为输入向量提供权重
- SQLTransformer
- VectorAssembler
- VectorSizeHint
- QuantileDiscretizer
- Imputer
- 特征提取api.pdf
- feature Selector
- VectorSlicer
- 处理特征向量的转换器
- RFormula
- ChiSqSelector
- 卡方特征选择
- Locality Sensitive Hashing
- This class of algorithms combines aspects of feature transformation with other algorithms.
- LSH Operations
- Feature Transformation
- Approximate Similarity Join
- Approximate Nearest Neighbor Search
- LSH Algorithms
- Bucketed Random Projection for Euclidean Distance
- MinHash for Jaccard Distance
- model操作
- 也是一种transformer
- save
- tranformer
- Estimator类型
- LogisticRegression
- IDF
- 是一个适合数据集并生成IDFModel的评估器(Estimator)
- DFModel获叏特征向量(通常由HashingTF戒CountVectorizer创建)并缩放每列。
- 直观地说,它下调了在诧料库中频繁出现的列
- Word2Vec
- evaluator
- ml的评估函数
- BinaryClassificationEvaluator
- MulticlassClassificationEvaluator
- RegressionEvaluator
- ClusteringEvaluator
- 其他
- PipeLineModel
- load
- CrossValidator
- 交叉验证
- 得到最佳参数集的模型.
- paramGridBuilder
- 参数网络
- 训练验证拆分
- TrainValidationSplit
原理
概述
- 支持的损失函数
- hingeLoss
- SVM中的
- logitstic
- 逻辑损失
- 平方损失
- squared loss
- 支持的正则
- L1
- L2
- Elastic
- l1和h2
- 支持优化算法
- 随机梯度下降
- 改进的拟牛顿法
- L-BFGS
spark.ml源码
概述
- 基于dataset的一套API
- 训练好的模型是一个transformer
- 没训练的模型是一个Estimator
- fit
- train
源码组成
以下内容来自我整理的源码各个包的内容, 是2.3版本的源码结构
- ml包 一些基本的接口
- Transformer
- 转化器
- transform
- 转换方法
- Model
- 一个用做拟合的模型
- A fitted model
- 继承了Transformer接口
- 类型
- PredictionModel
- 其他各种model
- Estimator
- 可以叫做模型学习器
- 用来拟合数据产生模型, 然后转化数据的
- 是拟合后的模型
- 定义了fit方法
- 核心方法
- Predictor
- 预测器
- 继承了Model
- 利用各种模型拟合后的转化器, 预测测试集
- 一般的模型都会继承自这里
- 实现了fit
- 转换元数据
- 提取
- train信息
- 调用train方法
- 定义了train
- 实际训练的地方
- 子类有各种的
- Classifier
- regression
- 实际分类器
- 例如LogisticRegression
- 只需要实现train方法!
- Pipeline
- 管道
- 新API的核心概念
- 将一个个操作组成管道进行处理
- attribute包
- ML attributes
- 被用作提供描述dataset中列的额外信息
- classification包 提供分类器支持
- Classifier
- 分类器基类
- ProbabilisticClassificationModel
- 概率分类器??
- ProbabilisticClassifier
- 抽象类
- 基于概率的分类器
- ClassifierParams
- 分类器参数
- DecisionTreeClassifier
- GBTClassifier
- Gradient-Boosted Trees
- LinearSVC
- LogisticRegression
- MultilayerPerceptronClassifier
- NaiveBayes
- OneVsRestModel
- 用于二分类分类器,推广到多分类
- RandomForestClassifier
- 随机森林分类器
- regression 一些回归器的支持
- Regressor
- 回归器
- RandomForestRegressor
- AFTSurvivalRegression
- DecisionTreeRegressor
- GBTRegressor
- GeneralizedLinearRegression
- IsotonicRegression
- LinearRegression
- clustering
- BisectingKMeansModel
- 二分k均值
- KMeansModel
- k均值算法
- GaussianMixtureModel
- LDAModel
- 线性判别
- evaluation
- 评估器
- 用做评估模型
- BinaryClassificationEvaluator
- 二分类评估
- ClusteringEvaluator
- 聚类结果苹果
- MulticlassClassificationEvaluator
- 多分类评估
- RegressionEvaluator
- 回归评估
- feature
- 用来提取特征的工具箱
- 特征转换器的老巢
- 用来将原始数据或者特征转化为合适的形式
- 类型
- Transformer
- 转化器
- 从一个DataFrame转化另一个DataFrame
- Estimator
- 部分转化器其实本身上就是一个模型
- 需要fit出模型,才能转化
- Instance
- 代表着一个有着标签和特征的实例
- 和RDD的关系
- 一对多
- 和partition也是
- Binarizer
- 这个就是根据阈值将数值型转变为二进制型,阈值可以进行设定
- PCA
- pca降维
- 需要fit
- StandardScaler
- 标准化
- StopWordsRemover
- 停止词移除
- feature.stopwords
- Word2Vec
- word2vec转化器
- optim 优化器!! 内置多种优化器
- Minimizer
- 最小化优化器
- TruncatedNewtonMinimizer
- 拟牛顿法
- FirstOrderMinimizer
- ProjectedQuasiNewton
- SpectralProjectedGradient
- StochasticGradientDescent
- SimpleSGD
- AdaptiveGradientDescent
- StochasticAveragedGradient
- LBFGS
- LBFGSB
- A LIMITED MEMOR Y ALGORITHM F OR BOUND CONSTRAINED OPTIMIZA TION
- OWLQN
- Orthant-wise Limited Memory QuasiNewton method
- IterativelyReweightedLeastSquaresModel
- 迭代重加权最小二乘
- NormalEquationSolution
- 持有正常等式的类???
- WeightedLeastSquaresModel
- 有权重的最小二乘法
- optim.aggretator
- 聚合器
- DifferentiableLossAggregator
- 不同Loss的聚合器
- LogisticAggregator
- 用在逻辑回归中
- 计算二分类或者多分类的损失函数
- 多个LA可以聚合在一起
- HingeAggregator
- Hinge聚合器
- LeastSquaresAggregator
- 最小二乘聚合器
- HuberAggregator
- Huber Loss
- Huber Loss 是一个用于回归问题的带参损失函数, 优点是能增强平方误差损失函数(MSE, mean square error)对离群点的鲁棒性。
- optim.loss
- RDDLossFunction
- 计算Loss
- calculate方法
- 依赖与前面的聚合器来计算
- 就是聚合前面的结果
- 计算每个点的贡献到最终的loss函数
- DifferentiableRegularization
- 正则项
- 目前只提供了L2正则项
- tuning
- 调参工具
- CrossValidator
- 交叉验证
- ParamGridBuilder
- 网格搜索
- TrainValidationSplit
- tree
- 决策树的依赖
- Node
- Split
- TreeModels
- TreeParam
- ann
- 深度学习,
- 提供Loss和Layer支持
- param
- 参数
- 用做模型的参数设置
- 有多种类型
- param.shared
- SharedParamsCodeGen
- fpm
- fpGrowth
- 关联规则挖掘
- image
- spark 图像数据支持
- linalog
- 线性代数工具箱
- 包含一些json,sql等转化器
- recommednation
- 推荐系统
- ALS模型
- source.libsvm
- 导入libsvm类型数据的工具箱
- stat
- 线性代数一些工具箱
- 给出一些统计信息
- ChiSquareTest
- Correlation
- Summarizer
核心组件
以逻辑回归为例, 梳理分布式机器学习的训练过程
- LogisticRegression 逻辑回归算法
继承结构
- ProbabilisticClassifier
- Classifier
- Predictor
- Estimator
- train方法
- 封装成RDD
- 获取权重,features, label
- Instance实例
- 不再是原来的结构
- 每个element都instance
- label: Double, weight: Double, features: Vector
- 持久化
- warp模式,记录日志
- instances.treeAggregate聚合统计信息
- 返回 summarizer, labelSummarizer
- rdd.treeAggregate
- (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
- 用来计算平均值, 方差, 最小值等统计信息
- 计算不同的标签和一直的计数
- (seqOp, combOp, $(aggregationDepth))
- 添加的隐式转换
- 提取分类的具体参数
- weightSum
- label数
- feature数量
- 检查参数
- val (coefficientMatrix, interceptVector, objectiveHistory)
- 获取拟合后的模型参数,系数矩阵
- LogisticAggregator
- 继承自DifferentiableLossAggregator
- bcCoefficients
- 特征的对应系数
- bcFeaturesStd
- 特征的广播标准偏差值
- 作用?
- 用于特征系数的传递
- regularization
- 正则项
- costFun
- 用于计算Loss
- 参数
- instances
- getAggregatorFunc
- 根据这个来计算
- regularization
- caculate
- lowerBounds, upperBounds
- 选择优化器
- BreezeLBFGSB
- BreezeLBFGS
- BreezeOWLQN
- 初始化参数
- 初始化模型
- states
- optimizer.iterations(new CachedDiffFunction(costFun), optimizer.iterations(new CachedDiffFunction(costFun),new BDVDouble)
- 是一个迭代器
- 传入cost和初始值
- 循环迭代
- 进入训练
- state
- 跟踪优化器的信息
- 包括
- 现在的点
- 值
- 梯度
- @param x the current point being considered
- @param value f(x)
- @param grad f.gradientAt(x)
- @param adjustedValue f(x) + r(x), where r is any regularization added to the objective. For LBFGS, this is f(x).
- @param adjustedGradient f'(x) + r'(x), where r is any regularization added to the objective. For LBFGS, this is f'(x).
- @param iter what iteration number we are on.
- @param initialAdjVal f(x_0) + r(x_0), used for checking convergence
- @param history any information needed by the optimizer to do updates.
- @param searchFailed did the line search fail?
- 最后一次的迭代结果为state
- 训练完成, 调整尺度返回
- 拿到拟合后的参数
构建LogisticRegressionModel
- 构建LogisticRegressionSummary
– 封装训练结果 - 返回模型
- aggregate
- 参数
- 聚合的初始值:zeroValue: U
- 对序列操作的函数:seqOp
- 聚合函数:combOp
- aggregate函数将每个分区进行seqOp,且从zeroValue开始遍历分区里的所有元素
- 然后用combOp。从zeroValue开始遍历所有分区的结果。
- breeze库
- spark依赖这个库来做最优化
- Minimizer
- 最小化器
- FirstOrderMinimizer
- infiniteIterations
- 创建一个迭代器,
- 每次迭代代表一次训练
- aggregator库
- 概述
- 用做计算梯度和损失
- 多个聚合器可以合并并且计算共同的loss和梯度
- 计算得到的loss和grad保存在这里面
- DifferentiableLossAggregator
- Datum
- 添加到聚合器中更新loss和梯度的类型
- agg
- 具体的聚合器类型
- add
- Add a single data point to this aggregator
- 传入一个instance
- 然后更新权重和loss
- merge
- 传入另一个merge更新
- LogisticAggregator
- bcCoefficients
- 特征的广播系数!!
- bcFeaturesStd
- 特征的广播标准偏差值
- numClasses
- fitIntercept
- multinomial
- loss库
- RDDLossFunction
- 利用aggregator来计算loss
- 实际上的分析已经在aggregator上做了
- calculate方法
- DifferentiableRegularization
关键流程
- 常规分类器流程
- 加载数据
- spark.read.format(“libsvm”).load(path)
- 加载的dataset列信息如下
- features
- label
- 默认的
- 设置
- setFeaturesCol()
- setLabelCol()
- 划分数据集
- DataSet中的方法
- 根据权重和随机种子,划分数据集
- randomSplit(Array(0.3, 0.7), seed = 12345)
- 选择模型
- new LogisticRegression().setMaxIter(110).setRegParam(0.3).setElasticNetParam(0.8)
- 选择模型
- 设定参数
- 拟合模型
- fit方法
- 返回的是一个PredictionModel
- 测试集测试