文章地址:http://www.haha174.top/article/details/253510
项目源码:https://github.com/haha174/spark.git
1.简介
为什么要把RDD转换成DataSet呢因为那样的话,我们就可以直接针对于HDFS等任何可以构建为RDD的数据,使用sparkSql进行查询操作了。这个功能是无比强大的。想象一下针对与HDFS的数据直接就可以使用SQL进行查询。
spark支持两种方式将RDD转换成Dataset。
第一种方式,使用反射来推断包含了特定数据类型的RDD的元数据。这种基于反射的方式,代码比较简洁,当你已经知道了你的RDD的元数据时,是一种非常不错的方式。
第二种方式,是通过编程接口来创建DataFrame,你可以在程序运行时动态的构建一份元数据,然后将其应用到已经存在的RDD上这种方式代码比较冗长,但是在编写代码时还不知道元数据,只有在程序运行时才能动态的得知元数据,那么只有通过这种方式动态的构建元数据。
2.使用反射的方式推断元数据
JAVA版本:SparkSQL 支持将包含了javaBean的RDD转换为DataSet 的。
JavaBean 的信息就定义了元数据。SparkSql 现在不支持将包含了嵌套的JavaBean 或者复杂的List 等复杂数据的JavaBean。作为元数据的,只支持包含简单的JavaBean.
Scala版本:
而scala由于其具有隐士转换的特性,所以SparkSql 的Scala 接口,是支持自动将包含了 case class 的RDD转换为DataSet 的case class 就定义了元数据, Spark Sql 会通过反射读取传递给case class的参数的名称,然后将其作为列名。与java 不同的是,Spark SQL 是支持将包含了嵌套数据结构的case class 作为元数据的比如包含了ARRAY 等等。
下面给出java 示例:
下面给出scala 示例:
3.动态创建元数据
通过编程接口来创建DataFrame,你可以在程序运行时动态的构建一份元数据,然后将其应用到已经存在的RDD上这种方式代码比较冗长,但是在编写代码时还不知道元数据,只有在程序运行时才能动态的得知元数据,那么只有通过这种方式动态的构建元数据。
下面给出java 示例
public static void main(String[] args) {
SparkConf conf=new SparkConf().setMaster("local").setAppName("RDDToDataSetReflection");
JavaSparkContext sc=new JavaSparkContext(conf);
SQLContext sqlContext=new SQLContext(sc);
JavaRDD<String> listRDD=sc.textFile("C:\\Users\\wchen129\\Desktop\\data\\sparkdata\\students.txt");
//第一步创建RDD 但是需要转换成RDD<Row>
JavaRDD<Row> rowRDD=listRDD.map(new Function<String, Row>() {
@Override
public Row call(String s) throws Exception {
String[] string=s.split(",");
return RowFactory.create(Integer.parseInt(string[0]),string[1],Integer.parseInt(string[2]));
}
});
//动态构建元数据
List<StructField> fields=new ArrayList<StructField>();
fields.add(DataTypes.createStructField("id", DataTypes.IntegerType,true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType,true));
StructType structType=DataTypes.createStructType(fields);
// 第三部将使用动态的元数据,将RDD 转换为DataSet
Dataset studentFD=sqlContext.createDataFrame(rowRDD,structType);
//后面就可以使用DataSet
studentFD.registerTempTable("students");
Dataset teenagerFD=sqlContext.sql("select * from students where age<19 ");
List<Row> rows=teenagerFD.javaRDD().collect();
for(Row row:rows){
System.out.println(row);
}
}
下面给出scala 示例
object RDDToDataSetProgrammatically {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RDDToDataSetReflection")
val sc = new JavaSparkContext(conf)
val sqlContext = new SQLContext(sc)
val listRDD = sc.textFile("C:\\Users\\wchen129\\Desktop\\data\\sparkdata\\students.txt")
//第一步创建RDD 但是需要转换成RDD<Row>
val rowRDD = listRDD.map(new Function[String, Row]() {
@throws[Exception]
override def call(s: String): Row = {
val string = s.split(",")
RowFactory.create(string(0).toInt, string(1), string(2).toInt)
}
})
//动态构建元数据
val fields = new util.ArrayList[StructField]
fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true))
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true))
val structType = DataTypes.createStructType(fields)
// 第三部将使用动态的元数据,将RDD 转换为DataSet
val studentFD = sqlContext.createDataFrame(rowRDD, structType)
//后面就可以使用DataSet
studentFD.registerTempTable("students")
val teenagerFD = sqlContext.sql("select * from students where age<19 ")
val rows = teenagerFD.javaRDD.collect
import scala.collection.JavaConversions._
for (row <- rows) {
System.out.println(row)
}
}
}
欢迎关注,更多福利
这里写图片描述