spark 机器学习 - 校招准备

看源码, 对spark mllib包的一些笔记

Spark core发展

  • RDD
    • 弹性分布式数据集
      • 编译时类型安全,编译时就能检查出类型错误
    • 面向对象的编程风格,直接通过类名点的方式来操作数据
    • 序列化和反序列化的性能开销,无论是集群间的通信, 迓是IO操作都需要对对象的结构和数据进行序列化和反序列化
    • GC的性能开销,频繁的创建和销毁对象, 势必会增加GC。
  • DataFrame
    • 列方式组织的分布式数据集
      • 类型不安全!!!!!!!!
    • 结构化数据集
      • 包含了以ROW为单位的每行数据的列的信息; Spark通过Schema就能够读懂数据
      • 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了
    • 特点
      • off-heap
        • Spark能够以二进制的形式序列化数据(丌包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存
      • Tungsten:新的执行引擎;
      • Catalyst:新的语法解析框架
    • 优缺点
      • off-heap就像地盘, schema就像地图, Spark有地图又有自己地盘了
      • 丌再受JVM的限制, 也就不再收GC的困扰了
      • 对比RDD提升计算效率、减少数据读取、底层计算优化
      • DataFrame解决了RDD的缺点, 但是却丢了RDD的优点。 DataFrame不是类型安全的, API也不是面向对象风格的
  • DataSet
    • 特点
      • 类型安全的
        • 创建需要一个case Class
          • case class
          • java class
        • DataSet由一系列强类型的对象组成, 编译时可以进行类型检查
      • 已经序列化的结构数据
        • 编码的二进制
        • Encoder的分布式数据集
        • 不需要反序列化就能sort,shuffle
      • dataframe成为dataSet的一种特例
      • 以Catalyst逻辑执行计划表示

功能

  • ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
  • Featurization: feature extraction, transformation, dimensionality reduction, and selection
  • Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
  • Persistence: saving and load algorithms, models, and Pipelines
  • Utilities: linear algebra, statistics, data handling, etc.

两套API

  • RDD-based API
    • 过去的API
    • spark.mllib
  • DataFrame-based API
    • 2.0主推的
    • spark.ml

依赖

  • Breeze —Scala用的数值处理库
  • netlib-java
    • netlib-java是低级BLAS, LAPACK和ARPACK的包装器
    • BLAS(基本线性代数子程序)
    • LAPACK — Linear Algebra PACKage
      • 它提供了求解联立线性方程组,线性方程组最小二乘解,特征值问题和奇异值问题的例程。
      • 还提供了相关的矩阵因子分解(LU,Cholesky,QR,SVD,Schur,广义Schur),以及相关计算
    • ARPACK是Fortran77子程序的集合,旨在解决大规模特征值问题

MLBase

  • MLlib
    • 封装的常用机器学习算法
    • 通过RDD和MLlib封装的breeze矩阵计算接口实现了如上算法
    • 数据类型
      • 本地向量
      • 标记点
      • 本地矩阵
      • 分布式矩阵
        • rowMatrix
        • IndexRowMatrix
        • BlockMatrix
        • CoordinateMatrix
          • 适合特别稀疏的
    • 支持算法
      • 分类
        • 决策树
        • LR
        • SVM
        • NB
      • 回归
        • 决策树
        • 线性回归
        • 岭回归
        • Lasso
      • 聚类
        • kmeans
      • 协同过滤
        • ALS
  • MLRuntime
    • spark core提供的分布式内存计算框架
    • 运行由optimizer优化过的算法
  • MLI
    • 进行特征提取和高级ML编程抽象算法实现的APi
  • ML Optimizer
    • 核心,优化器
    • 可以把声明式任务转化为学习计划
  • ml
    • 提供了基于DF的高层API
    • 提供了很多基于流水线的数据处理函数

MLlib

概述

  • 不再学习这套老旧API
  • 基于RDD的一系列ML操作

Vector

  • MLLIb封装的breeze向量方法
  • DenseVector
  • SparseVector

Matrix

  • 封装的breeze方法

BLAS库

  • 封装了部分BLAS线代方法

分布式矩阵

  • RowMatrix
    • 每行对应一个RDD
  • IndexedRowMatrix
    • 带有一定意义的行索引
    • 每行对应一个RDD
  • CoordinateMatrix
    • 坐标矩阵
    • 每一项都是 i,j,value Tuple
  • BlockMatrix
    • 分块矩阵

ML

概述

  • 2.0版本
  • 基于DF的高层次API, 可以用来构建工作流
  • DataFrame
    • 最为ML的数据集
    • 支持多种类型
      • 向量
      • 文本
      • 图像
      • 结构化数据
  • TransFormer
    • 转换器
      • 用来从一个DF转换成另一个DF
    • 是特征变换和机器学习模型的抽象
    • 实现transform方法
  • Estimator
    • 模型学习器
      • 从一个DF生成一个Model
      • Model也是一个transformer
    • 实现fit方法
  • Pipelines
    • 将多个transformer和Estimator组合, 形成一个工作流
    • 测试集测试
      • 转换
      • 使用Model predict
    • 训练模型
      • 转换
      • 训练模型
  • Parameter
    • 参数
    • set参数
      • 如: SetMaxIter(10)
      • paramMap: Map(lrl.MaxIter -> 10)

总结: 为什么mllib能够实现分布式机器学习训练?

  • 本质: 利用RDD性质实现的数据样本切分的同步训练模型
    • 同步:
      • 在一个训练迭代下
      • Driver需要等待所有分区训练全部完成训练
      • 才可以继续下一轮训练
    • 数据样本切分
    • 异步
      • 不支持
      • 在TensorFlow中有实现
      • 根据当前参数的取值,和随机获取的一小部分训练数据,不同设备各自运行反向传播的过程,并独立地更新参数
    • 模型切分
      • LR算法没有模型切分
      • NN的实现还没看,估计也没有
  • 数据划分
    • 采用RDD自带的partition机制
    • 将数据集划分为n份
    • 持久化到内存中, 迭代运算
  • 模型训练
    • 通过breeze中的优化算法, 本地优化模型
    • 在计算LossFunction时,聚合各个aggregator的参数
    • 实现分布式训练
  • breeze训练库
    • chooseDescentDirect
    • determineStepSize
    • takeStep
    • loss, grad = caculate

api的使用

  • transformer是一个宽泛的概念, 包括一下的几种类型
    • Extraction
      • Extracting features from “raw” data
    • Transformation
      • Scaling, converting, or modifying features
    • Selection
      • Selecting a subset from a larger set of features
    • Locality Sensitive Hashing
      • This class of algorithms combines aspects of feature transformation with other algorithms.
  • feature Extraction
    • TF-IDF
      • 词频-逆向文件频率
      • 文本挖掘方法
      • 体现一个词语在语料库中的重要程度
      • 使用词频 TF 来衡量重要性,则很容易过分强调出现频率过高并且文档包含少许信息的单词
        • 如 a the of 等无意义的词
      • 使用词频来消除无意义词汇的影响
      • 使用
        • HashingTF不CountVectorizer用于生成词频TF向量
        • IDF(逆向文档频率)
    • Word2Vec
      • Estimator
      • 将词语映射到一个固定大小的向量
      • 向量可以用来计算文本相似度
    • CountVectorizer
      • CountVectorizer可以用作估计器来提叏词汇表,并生成CountVectorizerModel。
      • CountVectorizer将选择通过诧料库按术诧频率排序的top前几vocabSize词
    • FeatureHasher
  • feature transformer
    • hashingTF
      • 特征词集的转换器(Transformer)
      • 它可以将返些集合转换成固定长度的特征向量
      • HashingTF利用hashing trick,原始特征通过应用哈希函数映射到索引中
    • Tokenizer
      • 分词器
      • 将文本 (如一个句子)拆分成单词的过秳
    • StopWordsRemover
      • 移除停用词
    • n-gram
    • Binarizer
      • 二值化
    • PCA
      • 用源数据训练PCA
      • 然后用PCA降维
    • PolynomialExpansion
      • 多项式展开
      • 通过产生n维组合将原始特征拓展到多项式空间
    • Discrete Cosine Transform (DCT)
      • 离散余弦扩散
    • StringIndexer
    • IndexToString
    • OneHotEncoder (Deprecated since 2.3.0)
    • OneHotEncoderEstimator
      • 热独编码
      • 将标签指标映射为二值向量
    • VectorIndexer
      • 特征转换工具
      • 对离散特征进行编号
      • 适用于基于树的模型
    • Interaction
    • Normalizer
      • 正则化
    • StandardScaler
      • 规范化
    • MinMaxScaler
    • MaxAbsScaler
    • Bucketizer
      • 离散化重组
      • 将一列连续特征转换为特征桶
    • ElementwiseProduct
      • 为输入向量提供权重
    • SQLTransformer
    • VectorAssembler
    • VectorSizeHint
    • QuantileDiscretizer
    • Imputer
    • 特征提取api.pdf
  • feature Selector
    • VectorSlicer
      • 处理特征向量的转换器
    • RFormula
    • ChiSqSelector
      • 卡方特征选择
  • Locality Sensitive Hashing
    • This class of algorithms combines aspects of feature transformation with other algorithms.
    • LSH Operations
      • Feature Transformation
      • Approximate Similarity Join
      • Approximate Nearest Neighbor Search
    • LSH Algorithms
      • Bucketed Random Projection for Euclidean Distance
      • MinHash for Jaccard Distance
  • model操作
    • 也是一种transformer
    • save
    • tranformer
  • Estimator类型
    • LogisticRegression
    • IDF
      • 是一个适合数据集并生成IDFModel的评估器(Estimator)
      • DFModel获叏特征向量(通常由HashingTF戒CountVectorizer创建)并缩放每列。
      • 直观地说,它下调了在诧料库中频繁出现的列
    • Word2Vec
  • evaluator
    • ml的评估函数
    • BinaryClassificationEvaluator
    • MulticlassClassificationEvaluator
    • RegressionEvaluator
    • ClusteringEvaluator
  • 其他
    • PipeLineModel
      • load
    • CrossValidator
      • 交叉验证
      • 得到最佳参数集的模型.
    • paramGridBuilder
      • 参数网络
    • 训练验证拆分
      • TrainValidationSplit

原理

概述

  • 支持的损失函数
    • hingeLoss
      • SVM中的
    • logitstic
      • 逻辑损失
    • 平方损失
      • squared loss
  • 支持的正则
    • L1
    • L2
    • Elastic
      • l1和h2
  • 支持优化算法
    • 随机梯度下降
    • 改进的拟牛顿法
      • L-BFGS

spark.ml源码

概述

  • 基于dataset的一套API
  • 训练好的模型是一个transformer
  • 没训练的模型是一个Estimator
    • fit
    • train

源码组成

以下内容来自我整理的源码各个包的内容, 是2.3版本的源码结构

  • ml包 一些基本的接口
    • Transformer
      • 转化器
      • transform
        • 转换方法
    • Model
      • 一个用做拟合的模型
        • A fitted model
      • 继承了Transformer接口
      • 类型
        • PredictionModel
        • 其他各种model
    • Estimator
      • 可以叫做模型学习器
      • 用来拟合数据产生模型, 然后转化数据的
      • 是拟合后的模型
      • 定义了fit方法
        • 核心方法
    • Predictor
      • 预测器
      • 继承了Model
      • 利用各种模型拟合后的转化器, 预测测试集
      • 一般的模型都会继承自这里
        • 实现了fit
          • 转换元数据
          • 提取
          • train信息
          • 调用train方法
        • 定义了train
          • 实际训练的地方
      • 子类有各种的
        • Classifier
        • regression
    • 实际分类器
      • 例如LogisticRegression
      • 只需要实现train方法!
    • Pipeline
      • 管道
      • 新API的核心概念
      • 将一个个操作组成管道进行处理
  • attribute包
    • ML attributes
    • 被用作提供描述dataset中列的额外信息
  • classification包 提供分类器支持
    • Classifier
      • 分类器基类
    • ProbabilisticClassificationModel
      • 概率分类器??
      • ProbabilisticClassifier
        • 抽象类
        • 基于概率的分类器
    • ClassifierParams
      • 分类器参数
    • DecisionTreeClassifier
    • GBTClassifier
      • Gradient-Boosted Trees
    • LinearSVC
    • LogisticRegression
    • MultilayerPerceptronClassifier
    • NaiveBayes
    • OneVsRestModel
      • 用于二分类分类器,推广到多分类
    • RandomForestClassifier
      • 随机森林分类器
  • regression 一些回归器的支持
    • Regressor
      • 回归器
    • RandomForestRegressor
    • AFTSurvivalRegression
    • DecisionTreeRegressor
    • GBTRegressor
    • GeneralizedLinearRegression
    • IsotonicRegression
    • LinearRegression
  • clustering
    • BisectingKMeansModel
      • 二分k均值
    • KMeansModel
      • k均值算法
    • GaussianMixtureModel
    • LDAModel
      • 线性判别
  • evaluation
    • 评估器
      • 用做评估模型
    • BinaryClassificationEvaluator
      • 二分类评估
    • ClusteringEvaluator
      • 聚类结果苹果
    • MulticlassClassificationEvaluator
      • 多分类评估
    • RegressionEvaluator
      • 回归评估
  • feature
    • 用来提取特征的工具箱
      • 特征转换器的老巢
      • 用来将原始数据或者特征转化为合适的形式
    • 类型
      • Transformer
        • 转化器
        • 从一个DataFrame转化另一个DataFrame
      • Estimator
        • 部分转化器其实本身上就是一个模型
        • 需要fit出模型,才能转化
    • Instance
      • 代表着一个有着标签和特征的实例
      • 和RDD的关系
        • 一对多
        • 和partition也是
    • Binarizer
      • 这个就是根据阈值将数值型转变为二进制型,阈值可以进行设定
    • PCA
      • pca降维
      • 需要fit
    • StandardScaler
      • 标准化
    • StopWordsRemover
      • 停止词移除
      • feature.stopwords
    • Word2Vec
      • word2vec转化器
  • optim 优化器!! 内置多种优化器
    • Minimizer
      • 最小化优化器
      • TruncatedNewtonMinimizer
        • 拟牛顿法
      • FirstOrderMinimizer
        • ProjectedQuasiNewton
        • SpectralProjectedGradient
        • StochasticGradientDescent
          • SimpleSGD
          • AdaptiveGradientDescent
        • StochasticAveragedGradient
        • LBFGS
        • LBFGSB
          • A LIMITED MEMOR Y ALGORITHM F OR BOUND CONSTRAINED OPTIMIZA TION
        • OWLQN
          • Orthant-wise Limited Memory QuasiNewton method
    • IterativelyReweightedLeastSquaresModel
      • 迭代重加权最小二乘
    • NormalEquationSolution
      • 持有正常等式的类???
    • WeightedLeastSquaresModel
      • 有权重的最小二乘法
    • optim.aggretator
      • 聚合器
      • DifferentiableLossAggregator
        • 不同Loss的聚合器
      • LogisticAggregator
        • 用在逻辑回归中
        • 计算二分类或者多分类的损失函数
        • 多个LA可以聚合在一起
      • HingeAggregator
        • Hinge聚合器
      • LeastSquaresAggregator
        • 最小二乘聚合器
      • HuberAggregator
        • Huber Loss
        • Huber Loss 是一个用于回归问题的带参损失函数, 优点是能增强平方误差损失函数(MSE, mean square error)对离群点的鲁棒性。
    • optim.loss
      • RDDLossFunction
        • 计算Loss
          • calculate方法
        • 依赖与前面的聚合器来计算
          • 就是聚合前面的结果
          • 计算每个点的贡献到最终的loss函数
      • DifferentiableRegularization
        • 正则项
        • 目前只提供了L2正则项
  • tuning
    • 调参工具
    • CrossValidator
      • 交叉验证
    • ParamGridBuilder
      • 网格搜索
    • TrainValidationSplit
  • tree
    • 决策树的依赖
    • Node
    • Split
    • TreeModels
    • TreeParam
  • ann
    • 深度学习,
    • 提供Loss和Layer支持
  • param
    • 参数
      • 用做模型的参数设置
      • 有多种类型
    • param.shared
      • SharedParamsCodeGen
  • fpm
    • fpGrowth
      • 关联规则挖掘
  • image
    • spark 图像数据支持
  • linalog
    • 线性代数工具箱
    • 包含一些json,sql等转化器
  • recommednation
    • 推荐系统
    • ALS模型
  • source.libsvm
    • 导入libsvm类型数据的工具箱
  • stat
    • 线性代数一些工具箱
      • 给出一些统计信息
    • ChiSquareTest
    • Correlation
    • Summarizer

核心组件

以逻辑回归为例, 梳理分布式机器学习的训练过程

  • LogisticRegression 逻辑回归算法

继承结构

  • ProbabilisticClassifier
    • Classifier
      • Predictor
        • Estimator
  • train方法
    • 封装成RDD
      • 获取权重,features, label
      • Instance实例
        • 不再是原来的结构
        • 每个element都instance
      • label: Double, weight: Double, features: Vector
    • 持久化
    • warp模式,记录日志
    • instances.treeAggregate聚合统计信息
      • 返回 summarizer, labelSummarizer
      • rdd.treeAggregate
      • (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
        • 用来计算平均值, 方差, 最小值等统计信息
        • 计算不同的标签和一直的计数
      • (seqOp, combOp, $(aggregationDepth))
        • 添加的隐式转换
    • 提取分类的具体参数
      • weightSum
      • label数
      • feature数量
      • 检查参数
    • val (coefficientMatrix, interceptVector, objectiveHistory)
      • 获取拟合后的模型参数,系数矩阵
      • LogisticAggregator
        • 继承自DifferentiableLossAggregator
        • bcCoefficients
          • 特征的对应系数
        • bcFeaturesStd
          • 特征的广播标准偏差值
        • 作用?
          • 用于特征系数的传递
      • regularization
        • 正则项
      • costFun
        • 用于计算Loss
        • 参数
          • instances
          • getAggregatorFunc
          • 根据这个来计算
          • regularization
        • caculate
      • lowerBounds, upperBounds
      • 选择优化器
        • BreezeLBFGSB
        • BreezeLBFGS
        • BreezeOWLQN
      • 初始化参数
      • 初始化模型
      • states
        • optimizer.iterations(new CachedDiffFunction(costFun), optimizer.iterations(new CachedDiffFunction(costFun),new BDVDouble)
        • 是一个迭代器
        • 传入cost和初始值
      • 循环迭代
        • 进入训练
        • state
          • 跟踪优化器的信息
          • 包括
          • 现在的点
          • 梯度
          • @param x the current point being considered
          • @param value f(x)
          • @param grad f.gradientAt(x)
          • @param adjustedValue f(x) + r(x), where r is any regularization added to the objective. For LBFGS, this is f(x).
          • @param adjustedGradient f'(x) + r'(x), where r is any regularization added to the objective. For LBFGS, this is f'(x).
          • @param iter what iteration number we are on.
          • @param initialAdjVal f(x_0) + r(x_0), used for checking convergence
          • @param history any information needed by the optimizer to do updates.
          • @param searchFailed did the line search fail?
        • 最后一次的迭代结果为state
      • 训练完成, 调整尺度返回
    • 拿到拟合后的参数

构建LogisticRegressionModel

  • 构建LogisticRegressionSummary
    – 封装训练结果
    • 返回模型
  • aggregate
    • 参数
      • 聚合的初始值:zeroValue: U
      • 对序列操作的函数:seqOp
      • 聚合函数:combOp
    • aggregate函数将每个分区进行seqOp,且从zeroValue开始遍历分区里的所有元素
    • 然后用combOp。从zeroValue开始遍历所有分区的结果。
  • breeze库
    • spark依赖这个库来做最优化
    • Minimizer
      • 最小化器
    • FirstOrderMinimizer
      • infiniteIterations
        • 创建一个迭代器,
        • 每次迭代代表一次训练
  • aggregator库
    • 概述
      • 用做计算梯度和损失
      • 多个聚合器可以合并并且计算共同的loss和梯度
      • 计算得到的loss和grad保存在这里面
    • DifferentiableLossAggregator
      • Datum
        • 添加到聚合器中更新loss和梯度的类型
      • agg
        • 具体的聚合器类型
      • add
        • Add a single data point to this aggregator
        • 传入一个instance
        • 然后更新权重和loss
      • merge
        • 传入另一个merge更新
    • LogisticAggregator
      • bcCoefficients
        • 特征的广播系数!!
      • bcFeaturesStd
        • 特征的广播标准偏差值
      • numClasses
      • fitIntercept
      • multinomial
  • loss库
    • RDDLossFunction
      • 利用aggregator来计算loss
      • 实际上的分析已经在aggregator上做了
      • calculate方法
    • DifferentiableRegularization

关键流程

  • 常规分类器流程
    • 加载数据
      • spark.read.format(“libsvm”).load(path)
      • 加载的dataset列信息如下
        • features
        • label
        • 默认的
      • 设置
      • setFeaturesCol()
      • setLabelCol()
    • 划分数据集
      • DataSet中的方法
        • 根据权重和随机种子,划分数据集
      • randomSplit(Array(0.3, 0.7), seed = 12345)
    • 选择模型
      • new LogisticRegression().setMaxIter(110).setRegParam(0.3).setElasticNetParam(0.8)
      • 选择模型
      • 设定参数
    • 拟合模型
      • fit方法
      • 返回的是一个PredictionModel
    • 测试集测试
    原文作者:大菜菜
    原文地址: https://zhuanlan.zhihu.com/p/75044615
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞