spark1.6迁移到spark2.2实践和新特性初探

1. 前言

如果你没用过1.6版本的Spark,看这篇文章会很轻松,直接关心2.2版本的特性就好。

先简单介绍下Spark的几个重要模块

  1. Spark SQL用于处理结构化数据,可以用SQL查询,可以读写HIVE表和HDFS文件,和Hadoop生态无缝连接,这个也是Spark如此流行的原因之一。有评测表示Spark SQL的速度是HiveQL的10倍多,因为Spark SQL在整个数据处理过程中都是在内存中进行,不需要受限于磁盘的IO。快也是有代价的,Spark任务本身是非常耗内存的,也经常会出现内存不足的状况,好多的任务优化都是围绕优化内存。
  2. MLlib是Spark的机器学习库。包括常用的分类、回归、聚类和协同过滤算法。特征工程上支持特征提取、转换、降维、特征选择功能。支持Pipeline,提供模型构建、评估和调优的工具。支持模型保存、下载。另外,该模块还支持线性代数、统计和数据处理等操作。
  3. Spark Streaming用于处理实时数据,输入可以是Kafka, Flume, Kinesis, 或者TCP sockets,输出可以是分布式文件系统HDFS、数据库、面板。中间可以用map、reduce、join和window函数处理,甚至可以用MLlib和GraphX模块做处理,非常强大了。
  4. Spark GraphX用于图计算,是较新的模块。谷歌最早的算法PageRank用于衡量图中节点的权重,求解过程就是图计算。

除了Spark Graphx,其他几个模块在工业界都是常用的。Spark Streaming笔者用的不多,没有迁移经验,主要讲Spark SQL、MLlib的变化。

2. Spark SQL模块

如果你只用Spark处理离线数据,那么看这个小节就足够了。

将python版本的Spark1.6的数据流程迁移到Spark2.2版本,一个简单数据流程的1.6和2.2版本代码如下:

# spark1.6版本
#!/bin/python
#coding:utf-8

from pyspark import SparkContext
from pyspark.sql import HiveContext
import sys
import time

reload(sys)
sys.setdefaultencoding('utf8')

sc = SparkContext(appName="yourSpk")
hiveContext = HiveContext(sc)
df = hiveContext("SELECT * FROM srcTable")
sc.stop()

#####################版本分隔符########################
# spark2.2版本
#!/bin/python
#coding:utf-8
from pyspark.sql import SparkSession
import sys
import time

reload(sys)
sys.setdefaultencoding('utf8')

spark = SparkSession \
    .builder \
    .appName("yourSpk") \
    .enableHiveSupport() \
    .getOrCreate()
df = spark.sql("SELECT * FROM srcTable")
spark.stop()

如果你的数据流程非常简单,且要从spark-1.6迁移到2.2版本,可以参考上面的例子。

2.1 SparkSession作为入口

在Spark1.6中SQLContext 和 HiveContext是处理结构化数据(行列数据)的程序入口,在spark-2.2中用SparkSession代替了上述两种方式作为程序入口。

其中enableHiveSupport()表示允许操作Hive数据。getOrCreate()表示如果已经存在这个SparkSession则直接返回,否则就按照配置创建一个。

如果需要在创建SparkSession的时候增加配置,用config(“spark.some.config.option”, “some-value”)函数,比如要配置并行度为200,就是config(“spark.default.parallelism”,”200″)。当然,这个配置也可以在spark-submit的时候去指定,那就是

spark-submit --conf spark.default.parallelism=200  ${your_sparkfile}.py

效果是一样的,都是让spark任务的并行度是200。

2.2 其他变化

DataFrame

  • registerTempTable表示根据RDD创建一个临时表,在2.x中建议用createOrReplaceTempView取代。
  • unionAll和SQL中的UNION ALL含义相同,在2.x中建议用union取代。

计算精度

这是个坑。Spark2.2中SELECT ‘0.1’=0 返回的是true! 因为String类型的小数(在这里是0.1)会被转换为int,所以被转换成了0。如果你的数据类型是String类型,做数值计算时会有问题,解决方法是先进行类型转换。

3. MLlib

Machine Learning Library(MLlib)是Spark的机器学习库,发生了如下的主要变化。

DataFrame-based APIs的线性代数类

Spark线性代数依赖移到了一个新的工程mllib-local中,线性代数的类移到了spark.ml.linalg,所以DataFrame-based APIs依赖于spark.ml.linalg类。RDD-based API的依赖并没有改变,还是依赖spark.mllib.linalg。

若想将spark.mllib.linalg的向量和矩阵转换成spark.ml.linalg形式的向量和矩阵,可以用spark.mllib.util.MLUtils。

Python版本的转换如下:

from pyspark.mllib.util import MLUtils

# convert DataFrame columns
convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF)
convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF)
# convert a single vector or matrix
mlVec = mllibVec.asML()
mlMat = mllibMat.asML()

Scala版本:

import org.apache.spark.mllib.util.MLUtils

// convert DataFrame columns val convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF)
val convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF)
// convert a single vector or matrix val mlVec: org.apache.spark.ml.linalg.Vector = mllibVec.asML
val mlMat: org.apache.spark.ml.linalg.Matrix = mllibMat.asML

Java版本:

import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.sql.Dataset;

// convert DataFrame columns Dataset<Row> convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF);
Dataset<Row> convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF);
// convert a single vector or matrix org.apache.spark.ml.linalg.Vector mlVec = mllibVec.asML();
org.apache.spark.ml.linalg.Matrix mlMat = mllibMat.asML();

API接口从RDD变成DataFrame

MLLib的API从基于RDD变成基于DataFrame,RDD方式的API以后处于维护阶段,会修复bug,但不再增加新功能,RDD API预计在2.3中不推荐使用,在3.0中删除。官方给出变化的原因是DataFrame更加友好,且在各种编程语言中API形式一致。

新增特性

其他增加的新特性见Spark2.2 MLlib新特性

比如增加了ALS算法,可以用于推荐系统的top-k推荐(SPARK-19535)。

被删除的方法

从1.6到2.0版本中spark.mllib and spark.ml中有不少被删除或者不推荐使用的方法。在此不再赘述,详情见1.6到2.0中被删除或不推荐方法

(完)

    原文作者:XbtLin
    原文地址: https://zhuanlan.zhihu.com/p/34426875
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞