最新的aliyun-emapreduce-sdk
将MaxCompute
数据以DataSource
的方式接入Spark 2.x,用户可以使用类似Spark 2.x中读写json/parquet/csv
的方式来访问MaxCompute.
0. DataSource
a) DataSource
提供了一种插件式的外部数据接入SparkSQL的方式,数据源只要实现相应的DataSource API
即可以整合进SparkSQL,它的特点如下:
- 通过DataSet/DataFrame/sparkSQLText等标准方式来访问数据源
- SparkSQL引擎优化
- scala语言接入后,Spark支持的其它语言也可以进行访问,如pyspark等
Spark 2.x内置支持的数据源:
- json
- csv
- parquet
- orc
- text
- jdbc
Spark 2.x 访问数据源示例:
b) 读数据
val df = spark.read.json("pathToJson")
// 提供schema信息
val schemaType = StructType(Seq(
StructField("year", IntegerType, nullable = true))
spark.read.schema(schemaType)json("pathToJson")
// 带一些参数设置,如csv的分隔符等
spark.read.option("header", "false").option("sep", ",").csv("pathToCsv"")
// load api, 等同于spark.read.json("pathToLoad")
spark.read.format("json").load("pathToLoad")
// sql方式访问
df.createOrReplaceTempView("t")
spark.sql("select * from t")
c) 写数据
val df = Seq(1, 2, 3).toDF("a")
df.write.json("jsonWritePath")
// 等同上面写法
df.write.format("json").save("jsonWritePath")
// 带参数
df.write
.option("header", "true")
.option("compression", "gZiP")
.csv("csvWritePath")
// 路径存在,则覆盖
df.write.mode("overwrite").json("jsonWritePath")
d)sparkSQLText using DataSource
spark.sql("create table t(a string) using json")
spark.sql("insert int table t select 1")
spark.sql("select * from t")
...
1. MaxCompute以DataSource接入Spark 2.x
如上介绍了DataSource的特点以及读写方式,MaxCompute作为一个数据源,通过E-MapReduce的aliyun-emapreduce-sdk
也可以通过上述方式来访问。
1.1. aliyun-emapreduce-sdk
branch: master-2.x
1.2 SparkSQL读写MaxCompute
a) option参数设置
访问MaxCompute表中的数据,需要一些参数,如下:
parameter | optional | value |
---|---|---|
odpsUrl | No | 内网地址: http://odps-ext.aliyun-inc.com/api 公网地址:http://service.odps.aliyun.com/api |
tunnelUrl | No | 内网地址: http://dt-ext.odps.aliyun-inc.com 公网地址:http://dt.odps.aliyun.com |
accessKeySecret | No | 阿里云accessKeySecret |
accessKeyId | No | 阿里云accessKeyId |
project | No | MaxCompute项目空间 |
table | No | MaxCompute表名 |
numPartitions | Yes | 表的Partition个数,默认 1 |
partitionSpec | Yes | 分区信息,如pt=xxx,多个用逗号分开pt=xxx,dt=xxx |
allowCreatNewPartition | Yes | 分区不存在是否创建,默认 false |
b) 写数据
- MaxCompute中必须已经存在表()),若没有需要去MaxCompute控制台进行创建
- 将
DataFrame
中的数据写入MaxCompute的表中
val df = Seq(("Hello", "E-MapReduce")).toDF("a","b")
df.write.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl", "http://odps-ext.aliyun-inc.com/api")
.option("tunnelUrl", "http://dt-ext.odps.aliyun-inc.com")
.option("table", "t")
.option("project", "test_odpss")
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.mode("overwrite") //覆盖写
.save()
case class MyClass(a: String, b: String)
val df1 = Seq(MyClass("Hello", "World")).toDF
df1.write.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl", "http://odps-ext.aliyun-inc.com/api")
.option("tunnelUrl", "http://dt-ext.odps.aliyun-inc.com")
.option("table", "t")
.option("project", "test_odpss")
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.mode("append") //append追加
.save()
// 写分区表 建表:create table t1(a string) partitioned by(b string)
val df2 = Seq("E-MapReduce").toDF("a") // 不包含分区列
df2.write.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl", "http://odps-ext.aliyun-inc.com/api")
.option("tunnelUrl", "http://dt-ext.odps.aliyun-inc.com")
.option("table", "t1")
.option("project", "test_odpss")
.option("partitionSpec","b='Hello'") // 分区描述
.option("allowCreatNewPartition","true") //若分区不存在,是否创建
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.mode("append") //append追加
.save()
备注:
DataFrame的列名和类型必须和MaxCompute的表中一致
不支持
spark.write.parititonBy
不支持
动态分区
- MaxCompute控制台查询表数据进行验证
序号 | a | b | |
---|---|---|---|
1 | Hello | E-MapReduce | |
2 | Hello | World |
c) 读数据
- 从上述表中读取数据到
DataFrame
val df = spark
.read
.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl", "http://odps-ext.aliyun-inc.com/api")
.option("tunnelUrl", "http://dt-ext.odps.aliyun-inc.com")
.option("table", "t")
.option("project", "test_odpss")
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.load()
df.show(false)
+-----+-----------+
|a |b |
+-----+-----------+
|Hello|E-MapReduce|
|Hello|World |
+-----+-----------+
// 读出为DataFrame后可进行DataFrame的各种操作,如join
val df1 = Seq(("Hello", "AliYun")).toDF("a", "c")
df.join(df1, "a").show(false)
+-----+-----------+-------+
|a |b |c |
+-----+-----------+-------+
|Hello|E-MapReduce|AliYun|
|Hello|World |AliYun|
+-----+-----------+-------+
// 也可注册为Spark的临时表
df.createOrReplaceTempView("test_t")
spark.sql("select * from test_t").show(false)
df1.createOrReplaceTempView("test_t_1")
spark.sql("select * from test_t join test_t_1 on test_t.a = test_t_1.a ")
// 读分区表 建表:create table t2(a string) partitioned by(b string)
spark.read.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl", "http://odps-ext.aliyun-inc.com/api")
.option("tunnelUrl", "http://dt-ext.odps.aliyun-inc.com")
.option("table", "t2") // table t2
.option("project", "test_odpss")
.option("partitionSpec","b='Hello'") // 分区描述
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.save()
+-----------+
|a |
+-----------+
|E-MapReduce|
+-----------+
d) sparkSQLText
-
不支持
在sparkSQLText直接对MaxCompute表
进行相关操作 - 可以通过上述读数据的方式使用
DataFrame
注册成临时表的方式,进行相关操作(insert不支持
)