本文讲解Spark中的RDD和DataFrame之间的互相转换,主要内容包括以下几点:
1、RDD转DataFrame原因及方式
2、DataFrame转RDD原因及方式
3、DataFrame转RDD的案例
1、RDD转DataFrame原因及方式
可以将RDD转成DataFrame之后,借用sparksql和sql以及HQL语句快速方便的使用sql语句统计和查询,针对HDFS中的数据,直接就可以使用SQL进行查询。
将RDD转化为DataFrame有两种方式:
方式一:通过反射推断schema
方式二、编程指定schema
2、DataFrame转RDD原因及方式
- 解决一些使用sql难以处理的统计分析
- 将数据写入Mysql
- a.DataFrame的write.jdbc,仅支持四种模式:append、overwrite、ignore、default
- b.使用rdd的话,除了上述以外还支持insert 和 update操作,还支持数据库连接池 (自定义,第三方:c3p0 hibernate mybatis)方式,批量高效将大量数据写入 Mysql
3、DataFrame转RDD的案例
Java版本
package com.spark.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import scala.Function1;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class RddToDFjava {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("RddToDF");
SparkSession spark = SparkSession.builder()
.config(conf)
.getOrCreate();
JavaRDD<String> dept = spark.sparkContext().textFile("data/dept.txt", 1)
JavaRDD<DeptBean> deptRDD = dept.map(line -> {
String[] arr = line.split("\t");
DeptBean deptBean = new DeptBean();
int deptid;
if (!arr[0].isEmpty()) {
deptid = Integer.valueOf(arr[0]);
} else {
deptid = 0;
}
deptBean.setDeptid(deptid);
deptBean.setDetname(arr[1]);
deptBean.setAdd(arr[2]);
return deptBean;
});
Dataset<Row> deptDF = spark.createDataFrame(deptRDD, DeptBean.class);
deptDF.printSchema();
deptDF.show();
}
}
Scala版本
package com.spark.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StructField, StructType,StringType}
object Rdd2DataFrame {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(getClass.getSimpleName)
// 创建SparkSession val spark = SparkSession
.builder()
.config(conf)
// .enableHiveSupport() .getOrCreate()
// 导入隐式转换包 import spark.implicits._
val deptLine: RDD[String] = spark.sparkContext.textFile("data/dept.txt")
case class Dept(deptid:Int, deptname:String, addr:String)
val dept: RDD[Dept] = deptLine.map(line => {
val arr: Array[String] = line.split("\t")
val deptid = arr(0).toInt
val deptname = arr(1)
val add = arr(2)
Dept(deptid,deptname,add)
})
val deptDF: DataFrame = dept.toDF()
deptDF.show()
}
}